b76ae6a6221669d25a4c93d2fc366b97d9f2ad63
[sahlberg/ctdb.git] / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "../common/rb_tree.h"
29 #include "db_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include <ctype.h>
32
33 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
34
35 /*
36   this is the dummy null procedure that all databases support
37 */
38 static int ctdb_null_func(struct ctdb_call_info *call)
39 {
40         return 0;
41 }
42
43 /*
44   this is a plain fetch procedure that all databases support
45 */
46 static int ctdb_fetch_func(struct ctdb_call_info *call)
47 {
48         call->reply_data = &call->record_data;
49         return 0;
50 }
51
52 /*
53   this is a plain fetch procedure that all databases support
54   this returns the full record including the ltdb header
55 */
56 static int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
57 {
58         call->reply_data = talloc(call, TDB_DATA);
59         if (call->reply_data == NULL) {
60                 return -1;
61         }
62         call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize;
63         call->reply_data->dptr  = talloc_size(call->reply_data, call->reply_data->dsize);
64         if (call->reply_data->dptr == NULL) {
65                 return -1;
66         }
67         memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header));
68         memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize);
69
70         return 0;
71 }
72
73
74 /**
75  * write a record to a normal database
76  *
77  * This is the server-variant of the ctdb_ltdb_store function.
78  * It contains logic to determine whether a record should be
79  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
80  * controls to the local ctdb daemon if apporpriate.
81  */
82 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
83                                   TDB_DATA key,
84                                   struct ctdb_ltdb_header *header,
85                                   TDB_DATA data)
86 {
87         struct ctdb_context *ctdb = ctdb_db->ctdb;
88         TDB_DATA rec;
89         int ret;
90         bool seqnum_suppressed = false;
91         bool keep = false;
92         bool schedule_for_deletion = false;
93         uint32_t lmaster;
94
95         if (ctdb->flags & CTDB_FLAG_TORTURE) {
96                 struct ctdb_ltdb_header *h2;
97                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
98                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
99                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
100                         DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
101                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
102                 }
103                 if (rec.dptr) free(rec.dptr);
104         }
105
106         if (ctdb->vnn_map == NULL) {
107                 /*
108                  * Called from a client: always store the record
109                  * Also don't call ctdb_lmaster since it uses the vnn_map!
110                  */
111                 keep = true;
112                 goto store;
113         }
114
115         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
116
117         /*
118          * If we migrate an empty record off to another node
119          * and the record has not been migrated with data,
120          * delete the record instead of storing the empty record.
121          */
122         if (data.dsize != 0) {
123                 keep = true;
124         } else if (ctdb_db->persistent) {
125                 keep = true;
126         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
127                 /*
128                  * The record is not created by the client but
129                  * automatically by the ctdb_ltdb_fetch logic that
130                  * creates a record with an initial header in the
131                  * ltdb before trying to migrate the record from
132                  * the current lmaster. Keep it instead of trying
133                  * to delete the non-existing record...
134                  */
135                 keep = true;
136                 schedule_for_deletion = true;
137         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
138                 keep = true;
139         } else if (ctdb_db->ctdb->pnn == lmaster) {
140                 /*
141                  * If we are lmaster, then we usually keep the record.
142                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
143                  * and the record is empty and has never been migrated
144                  * with data, then we should delete it instead of storing it.
145                  * This is part of the vacuuming process.
146                  *
147                  * The reason that we usually need to store even empty records
148                  * on the lmaster is that a client operating directly on the
149                  * lmaster (== dmaster) expects the local copy of the record to
150                  * exist after successful ctdb migrate call. If the record does
151                  * not exist, the client goes into a migrate loop and eventually
152                  * fails. So storing the empty record makes sure that we do not
153                  * need to change the client code.
154                  */
155                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
156                         keep = true;
157                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
158                         keep = true;
159                 }
160         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
161                 keep = true;
162         }
163
164         if (keep &&
165             (data.dsize == 0) &&
166             !ctdb_db->persistent &&
167             (ctdb_db->ctdb->pnn == header->dmaster))
168         {
169                 schedule_for_deletion = true;
170         }
171
172 store:
173         /*
174          * The VACUUM_MIGRATED flag is only set temporarily for
175          * the above logic when the record was retrieved by a
176          * VACUUM_MIGRATE call and should not be stored in the
177          * database.
178          *
179          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
180          * and there are two cases in which the corresponding record
181          * is stored in the local database:
182          * 1. The record has been migrated with data in the past
183          *    (the MIGRATED_WITH_DATA record flag is set).
184          * 2. The record has been filled with data again since it
185          *    had been submitted in the VACUUM_FETCH message to the
186          *    lmaster.
187          * For such records it is important to not store the
188          * VACUUM_MIGRATED flag in the database.
189          */
190         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
191
192         /*
193          * Similarly, clear the AUTOMATIC flag which should not enter
194          * the local database copy since this would require client
195          * modifications to clear the flag when the client stores
196          * the record.
197          */
198         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
199
200         rec.dsize = sizeof(*header) + data.dsize;
201         rec.dptr = talloc_size(ctdb, rec.dsize);
202         CTDB_NO_MEMORY(ctdb, rec.dptr);
203
204         memcpy(rec.dptr, header, sizeof(*header));
205         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
206
207         /* Databases with seqnum updates enabled only get their seqnum
208            changes when/if we modify the data */
209         if (ctdb_db->seqnum_update != NULL) {
210                 TDB_DATA old;
211                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
212
213                 if ( (old.dsize == rec.dsize)
214                 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
215                           rec.dptr+sizeof(struct ctdb_ltdb_header),
216                           rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
217                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
218                         seqnum_suppressed = true;
219                 }
220                 if (old.dptr) free(old.dptr);
221         }
222
223         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
224                             ctdb_db->db_name,
225                             keep?"storing":"deleting",
226                             ctdb_hash(&key)));
227
228         if (keep) {
229                 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
230         } else {
231                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
232         }
233
234         if (ret != 0) {
235                 int lvl = DEBUG_ERR;
236
237                 if (keep == false &&
238                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
239                 {
240                         lvl = DEBUG_DEBUG;
241                 }
242
243                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
244                             "%d - %s\n",
245                             ctdb_db->db_name,
246                             keep?"store":"delete", ret,
247                             tdb_errorstr(ctdb_db->ltdb->tdb)));
248
249                 schedule_for_deletion = false;
250         }
251         if (seqnum_suppressed) {
252                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
253         }
254
255         talloc_free(rec.dptr);
256
257         if (schedule_for_deletion) {
258                 int ret2;
259                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
260                 if (ret != 0) {
261                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
262                 }
263         }
264
265         return ret;
266 }
267
268 struct lock_fetch_state {
269         struct ctdb_context *ctdb;
270         void (*recv_pkt)(void *, struct ctdb_req_header *);
271         void *recv_context;
272         struct ctdb_req_header *hdr;
273         uint32_t generation;
274         bool ignore_generation;
275 };
276
277 /*
278   called when we should retry the operation
279  */
280 static void lock_fetch_callback(void *p)
281 {
282         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
283         if (!state->ignore_generation &&
284             state->generation != state->ctdb->vnn_map->generation) {
285                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
286                 talloc_free(state->hdr);
287                 return;
288         }
289         state->recv_pkt(state->recv_context, state->hdr);
290         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
291 }
292
293
294 /*
295   do a non-blocking ltdb_lock, deferring this ctdb request until we
296   have the chainlock
297
298   It does the following:
299
300    1) tries to get the chainlock. If it succeeds, then it returns 0
301
302    2) if it fails to get a chainlock immediately then it sets up a
303    non-blocking chainlock via ctdb_lockwait, and when it gets the
304    chainlock it re-submits this ctdb request to the main packet
305    receive function
306
307    This effectively queues all ctdb requests that cannot be
308    immediately satisfied until it can get the lock. This means that
309    the main ctdb daemon will not block waiting for a chainlock held by
310    a client
311
312    There are 3 possible return values:
313
314        0:    means that it got the lock immediately.
315       -1:    means that it failed to get the lock, and won't retry
316       -2:    means that it failed to get the lock immediately, but will retry
317  */
318 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
319                            TDB_DATA key, struct ctdb_req_header *hdr,
320                            void (*recv_pkt)(void *, struct ctdb_req_header *),
321                            void *recv_context, bool ignore_generation)
322 {
323         int ret;
324         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
325         struct lockwait_handle *h;
326         struct lock_fetch_state *state;
327         
328         ret = tdb_chainlock_nonblock(tdb, key);
329
330         if (ret != 0 &&
331             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
332                 /* a hard failure - don't try again */
333                 return -1;
334         }
335
336         /* when torturing, ensure we test the contended path */
337         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
338             random() % 5 == 0) {
339                 ret = -1;
340                 tdb_chainunlock(tdb, key);
341         }
342
343         /* first the non-contended path */
344         if (ret == 0) {
345                 return 0;
346         }
347
348         state = talloc(hdr, struct lock_fetch_state);
349         state->ctdb = ctdb_db->ctdb;
350         state->hdr = hdr;
351         state->recv_pkt = recv_pkt;
352         state->recv_context = recv_context;
353         state->generation = ctdb_db->ctdb->vnn_map->generation;
354         state->ignore_generation = ignore_generation;
355
356         /* now the contended path */
357         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
358         if (h == NULL) {
359                 return -1;
360         }
361
362         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
363            so it won't be freed yet */
364         talloc_steal(state, hdr);
365         talloc_steal(state, h);
366
367         /* now tell the caller than we will retry asynchronously */
368         return -2;
369 }
370
371 /*
372   a varient of ctdb_ltdb_lock_requeue that also fetches the record
373  */
374 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
375                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
376                                  struct ctdb_req_header *hdr, TDB_DATA *data,
377                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
378                                  void *recv_context, bool ignore_generation)
379 {
380         int ret;
381
382         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
383                                      recv_context, ignore_generation);
384         if (ret == 0) {
385                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
386                 if (ret != 0) {
387                         int uret;
388                         uret = ctdb_ltdb_unlock(ctdb_db, key);
389                         if (uret != 0) {
390                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
391                         }
392                 }
393         }
394         return ret;
395 }
396
397
398 /*
399   paraoid check to see if the db is empty
400  */
401 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
402 {
403         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
404         int count = tdb_traverse_read(tdb, NULL, NULL);
405         if (count != 0) {
406                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
407                          ctdb_db->db_path));
408                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
409         }
410 }
411
412 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
413                                 struct ctdb_db_context *ctdb_db)
414 {
415         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
416         char *old;
417         char *reason = NULL;
418         TDB_DATA key;
419         TDB_DATA val;
420
421         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
422         key.dsize = strlen(ctdb_db->db_name);
423
424         old = ctdb_db->unhealthy_reason;
425         ctdb_db->unhealthy_reason = NULL;
426
427         val = tdb_fetch(tdb, key);
428         if (val.dsize > 0) {
429                 reason = talloc_strndup(ctdb_db,
430                                         (const char *)val.dptr,
431                                         val.dsize);
432                 if (reason == NULL) {
433                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
434                                            (int)val.dsize));
435                         ctdb_db->unhealthy_reason = old;
436                         free(val.dptr);
437                         return -1;
438                 }
439         }
440
441         if (val.dptr) {
442                 free(val.dptr);
443         }
444
445         talloc_free(old);
446         ctdb_db->unhealthy_reason = reason;
447         return 0;
448 }
449
450 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
451                                   struct ctdb_db_context *ctdb_db,
452                                   const char *given_reason,/* NULL means healthy */
453                                   int num_healthy_nodes)
454 {
455         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
456         int ret;
457         TDB_DATA key;
458         TDB_DATA val;
459         char *new_reason = NULL;
460         char *old_reason = NULL;
461
462         ret = tdb_transaction_start(tdb);
463         if (ret != 0) {
464                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
465                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
466                 return -1;
467         }
468
469         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
470         if (ret != 0) {
471                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
472                                    ctdb_db->db_name, ret));
473                 return -1;
474         }
475         old_reason = ctdb_db->unhealthy_reason;
476
477         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
478         key.dsize = strlen(ctdb_db->db_name);
479
480         if (given_reason) {
481                 new_reason = talloc_strdup(ctdb_db, given_reason);
482                 if (new_reason == NULL) {
483                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
484                                           given_reason));
485                         return -1;
486                 }
487         } else if (old_reason && num_healthy_nodes == 0) {
488                 /*
489                  * If the reason indicates ok, but there where no healthy nodes
490                  * available, that it means, we have not recovered valid content
491                  * of the db. So if there's an old reason, prefix it with
492                  * "NO-HEALTHY-NODES - "
493                  */
494                 const char *prefix;
495
496 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
497                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
498                 if (ret != 0) {
499                         prefix = _TMP_PREFIX;
500                 } else {
501                         prefix = "";
502                 }
503                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
504                                          prefix, old_reason);
505                 if (new_reason == NULL) {
506                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
507                                           prefix, old_reason));
508                         return -1;
509                 }
510 #undef _TMP_PREFIX
511         }
512
513         if (new_reason) {
514                 val.dptr = discard_const_p(uint8_t, new_reason);
515                 val.dsize = strlen(new_reason);
516
517                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
518                 if (ret != 0) {
519                         tdb_transaction_cancel(tdb);
520                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
521                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
522                                            ret, tdb_errorstr(tdb)));
523                         talloc_free(new_reason);
524                         return -1;
525                 }
526                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
527                                    ctdb_db->db_name, new_reason));
528         } else if (old_reason) {
529                 ret = tdb_delete(tdb, key);
530                 if (ret != 0) {
531                         tdb_transaction_cancel(tdb);
532                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
533                                            tdb_name(tdb), ctdb_db->db_name,
534                                            ret, tdb_errorstr(tdb)));
535                         talloc_free(new_reason);
536                         return -1;
537                 }
538                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
539                                    ctdb_db->db_name));
540         }
541
542         ret = tdb_transaction_commit(tdb);
543         if (ret != TDB_SUCCESS) {
544                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
545                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
546                 talloc_free(new_reason);
547                 return -1;
548         }
549
550         talloc_free(old_reason);
551         ctdb_db->unhealthy_reason = new_reason;
552
553         return 0;
554 }
555
556 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
557                                      struct ctdb_db_context *ctdb_db)
558 {
559         time_t now = time(NULL);
560         char *new_path;
561         char *new_reason;
562         int ret;
563         struct tm *tm;
564
565         tm = gmtime(&now);
566
567         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
568         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
569                                    "%04u%02u%02u%02u%02u%02u.0Z",
570                                    ctdb_db->db_path,
571                                    tm->tm_year+1900, tm->tm_mon+1,
572                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
573                                    tm->tm_sec);
574         if (new_path == NULL) {
575                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
576                 return -1;
577         }
578
579         new_reason = talloc_asprintf(ctdb_db,
580                                      "ERROR - Backup of corrupted TDB in '%s'",
581                                      new_path);
582         if (new_reason == NULL) {
583                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
584                 return -1;
585         }
586         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
587         talloc_free(new_reason);
588         if (ret != 0) {
589                 DEBUG(DEBUG_CRIT,(__location__
590                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
591                                  ctdb_db->db_path));
592                 return -1;
593         }
594
595         ret = rename(ctdb_db->db_path, new_path);
596         if (ret != 0) {
597                 DEBUG(DEBUG_CRIT,(__location__
598                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
599                                   ctdb_db->db_path, new_path,
600                                   errno, strerror(errno)));
601                 talloc_free(new_path);
602                 return -1;
603         }
604
605         DEBUG(DEBUG_CRIT,(__location__
606                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
607                          ctdb_db->db_path, new_path));
608         talloc_free(new_path);
609         return 0;
610 }
611
612 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
613 {
614         struct ctdb_db_context *ctdb_db;
615         int ret;
616         int ok = 0;
617         int fail = 0;
618
619         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
620                 if (!ctdb_db->persistent) {
621                         continue;
622                 }
623
624                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
625                 if (ret != 0) {
626                         DEBUG(DEBUG_ALERT,(__location__
627                                            " load persistent health for '%s' failed\n",
628                                            ctdb_db->db_path));
629                         return -1;
630                 }
631
632                 if (ctdb_db->unhealthy_reason == NULL) {
633                         ok++;
634                         DEBUG(DEBUG_INFO,(__location__
635                                    " persistent db '%s' healthy\n",
636                                    ctdb_db->db_path));
637                         continue;
638                 }
639
640                 fail++;
641                 DEBUG(DEBUG_ALERT,(__location__
642                                    " persistent db '%s' unhealthy: %s\n",
643                                    ctdb_db->db_path,
644                                    ctdb_db->unhealthy_reason));
645         }
646         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
647               ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
648                ok, fail));
649
650         if (fail != 0) {
651                 return -1;
652         }
653
654         return 0;
655 }
656
657
658 /*
659   mark a database - as healthy
660  */
661 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
662 {
663         uint32_t db_id = *(uint32_t *)indata.dptr;
664         struct ctdb_db_context *ctdb_db;
665         int ret;
666         bool may_recover = false;
667
668         ctdb_db = find_ctdb_db(ctdb, db_id);
669         if (!ctdb_db) {
670                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
671                 return -1;
672         }
673
674         if (ctdb_db->unhealthy_reason) {
675                 may_recover = true;
676         }
677
678         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
679         if (ret != 0) {
680                 DEBUG(DEBUG_ERR,(__location__
681                                  " ctdb_update_persistent_health(%s) failed\n",
682                                  ctdb_db->db_name));
683                 return -1;
684         }
685
686         if (may_recover && !ctdb->done_startup) {
687                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
688                                   ctdb_db->db_name));
689                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
690         }
691
692         return 0;
693 }
694
695 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
696                                    TDB_DATA indata,
697                                    TDB_DATA *outdata)
698 {
699         uint32_t db_id = *(uint32_t *)indata.dptr;
700         struct ctdb_db_context *ctdb_db;
701         int ret;
702
703         ctdb_db = find_ctdb_db(ctdb, db_id);
704         if (!ctdb_db) {
705                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
706                 return -1;
707         }
708
709         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
710         if (ret != 0) {
711                 DEBUG(DEBUG_ERR,(__location__
712                                  " ctdb_load_persistent_health(%s) failed\n",
713                                  ctdb_db->db_name));
714                 return -1;
715         }
716
717         *outdata = tdb_null;
718         if (ctdb_db->unhealthy_reason) {
719                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
720                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
721         }
722
723         return 0;
724 }
725
726
727 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
728 {
729         char *ropath;
730
731         DEBUG(DEBUG_ERR,("XXX set db readonly %s\n", ctdb_db->db_name));
732
733         if (ctdb_db->readonly) {
734                 return 0;
735         }
736
737         if (ctdb_db->persistent) {
738                 DEBUG(DEBUG_ERR,("Trying to set persistent database with readonly property\n"));
739                 return -1;
740         }
741
742         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
743         if (ropath == NULL) {
744                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
745                 return -1;
746         }
747         ctdb_db->rottdb = tdb_open(ropath, 
748                               ctdb->tunable.database_hash_size, 
749                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
750                               O_CREAT|O_RDWR, 0);
751         if (ctdb_db->rottdb == NULL) {
752                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
753                 talloc_free(ropath);
754                 return -1;
755         }
756
757         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
758
759         ctdb_db->readonly = true;
760         talloc_free(ropath);
761         return 0;
762 }
763
764 /*
765   attach to a database, handling both persistent and non-persistent databases
766   return 0 on success, -1 on failure
767  */
768 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
769                              bool persistent, const char *unhealthy_reason,
770                              bool jenkinshash)
771 {
772         struct ctdb_db_context *ctdb_db, *tmp_db;
773         int ret;
774         struct TDB_DATA key;
775         unsigned tdb_flags;
776         int mode = 0600;
777         int remaining_tries = 0;
778
779         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
780         CTDB_NO_MEMORY(ctdb, ctdb_db);
781
782         ctdb_db->priority = 1;
783         ctdb_db->ctdb = ctdb;
784         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
785         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
786
787         key.dsize = strlen(db_name)+1;
788         key.dptr  = discard_const(db_name);
789         ctdb_db->db_id = ctdb_hash(&key);
790         ctdb_db->persistent = persistent;
791
792         if (!ctdb_db->persistent) {
793                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
794                 if (ctdb_db->delete_queue == NULL) {
795                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
796                 }
797
798                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
799         }
800
801         /* check for hash collisions */
802         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
803                 if (tmp_db->db_id == ctdb_db->db_id) {
804                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
805                                  tmp_db->db_id, db_name, tmp_db->db_name));
806                         talloc_free(ctdb_db);
807                         return -1;
808                 }
809         }
810
811         if (persistent) {
812                 if (unhealthy_reason) {
813                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
814                                                             unhealthy_reason, 0);
815                         if (ret != 0) {
816                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
817                                                    ctdb_db->db_name, unhealthy_reason, ret));
818                                 talloc_free(ctdb_db);
819                                 return -1;
820                         }
821                 }
822
823                 if (ctdb->max_persistent_check_errors > 0) {
824                         remaining_tries = 1;
825                 }
826                 if (ctdb->done_startup) {
827                         remaining_tries = 0;
828                 }
829
830                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
831                 if (ret != 0) {
832                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
833                                    ctdb_db->db_name, ret));
834                         talloc_free(ctdb_db);
835                         return -1;
836                 }
837         }
838
839         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
840                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
841                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
842                 talloc_free(ctdb_db);
843                 return -1;
844         }
845
846         if (ctdb_db->unhealthy_reason) {
847                 /* this is just a warning, but we want that in the log file! */
848                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
849                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
850         }
851
852         /* open the database */
853         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
854                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
855                                            db_name, ctdb->pnn);
856
857         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
858         if (ctdb->valgrinding) {
859                 tdb_flags |= TDB_NOMMAP;
860         }
861         tdb_flags |= TDB_DISALLOW_NESTING;
862         if (jenkinshash) {
863                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
864         }
865
866 again:
867         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
868                                       ctdb->tunable.database_hash_size, 
869                                       tdb_flags, 
870                                       O_CREAT|O_RDWR, mode);
871         if (ctdb_db->ltdb == NULL) {
872                 struct stat st;
873                 int saved_errno = errno;
874
875                 if (!persistent) {
876                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
877                                           ctdb_db->db_path,
878                                           saved_errno,
879                                           strerror(saved_errno)));
880                         talloc_free(ctdb_db);
881                         return -1;
882                 }
883
884                 if (remaining_tries == 0) {
885                         DEBUG(DEBUG_CRIT,(__location__
886                                           "Failed to open persistent tdb '%s': %d - %s\n",
887                                           ctdb_db->db_path,
888                                           saved_errno,
889                                           strerror(saved_errno)));
890                         talloc_free(ctdb_db);
891                         return -1;
892                 }
893
894                 ret = stat(ctdb_db->db_path, &st);
895                 if (ret != 0) {
896                         DEBUG(DEBUG_CRIT,(__location__
897                                           "Failed to open persistent tdb '%s': %d - %s\n",
898                                           ctdb_db->db_path,
899                                           saved_errno,
900                                           strerror(saved_errno)));
901                         talloc_free(ctdb_db);
902                         return -1;
903                 }
904
905                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
906                 if (ret != 0) {
907                         DEBUG(DEBUG_CRIT,(__location__
908                                           "Failed to open persistent tdb '%s': %d - %s\n",
909                                           ctdb_db->db_path,
910                                           saved_errno,
911                                           strerror(saved_errno)));
912                         talloc_free(ctdb_db);
913                         return -1;
914                 }
915
916                 remaining_tries--;
917                 mode = st.st_mode;
918                 goto again;
919         }
920
921         if (!persistent) {
922                 ctdb_check_db_empty(ctdb_db);
923         } else {
924                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
925                 if (ret != 0) {
926                         int fd;
927                         struct stat st;
928
929                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
930                                           ctdb_db->db_path, ret,
931                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
932                         if (remaining_tries == 0) {
933                                 talloc_free(ctdb_db);
934                                 return -1;
935                         }
936
937                         fd = tdb_fd(ctdb_db->ltdb->tdb);
938                         ret = fstat(fd, &st);
939                         if (ret != 0) {
940                                 DEBUG(DEBUG_CRIT,(__location__
941                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
942                                                   ctdb_db->db_path,
943                                                   errno,
944                                                   strerror(errno)));
945                                 talloc_free(ctdb_db);
946                                 return -1;
947                         }
948
949                         /* close the TDB */
950                         talloc_free(ctdb_db->ltdb);
951                         ctdb_db->ltdb = NULL;
952
953                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
954                         if (ret != 0) {
955                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
956                                                   ctdb_db->db_path));
957                                 talloc_free(ctdb_db);
958                                 return -1;
959                         }
960
961                         remaining_tries--;
962                         mode = st.st_mode;
963                         goto again;
964                 }
965         }
966
967         DLIST_ADD(ctdb->db_list, ctdb_db);
968
969         /* setting this can help some high churn databases */
970         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
971
972         /* 
973            all databases support the "null" function. we need this in
974            order to do forced migration of records
975         */
976         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
977         if (ret != 0) {
978                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
979                 talloc_free(ctdb_db);
980                 return -1;
981         }
982
983         /* 
984            all databases support the "fetch" function. we need this
985            for efficient Samba3 ctdb fetch
986         */
987         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
988         if (ret != 0) {
989                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
990                 talloc_free(ctdb_db);
991                 return -1;
992         }
993
994         /* 
995            all databases support the "fetch_with_header" function. we need this
996            for efficient readonly record fetches
997         */
998         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
999         if (ret != 0) {
1000                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1001                 talloc_free(ctdb_db);
1002                 return -1;
1003         }
1004
1005         ret = ctdb_vacuum_init(ctdb_db);
1006         if (ret != 0) {
1007                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1008                                   "database '%s'\n", ctdb_db->db_name));
1009                 talloc_free(ctdb_db);
1010                 return -1;
1011         }
1012
1013
1014         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
1015         
1016         /* success */
1017         return 0;
1018 }
1019
1020
1021 struct ctdb_deferred_attach_context {
1022         struct ctdb_deferred_attach_context *next, *prev;
1023         struct ctdb_context *ctdb;
1024         struct ctdb_req_control *c;
1025 };
1026
1027
1028 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1029 {
1030         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1031
1032         return 0;
1033 }
1034
1035 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1036 {
1037         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1038         struct ctdb_context *ctdb = da_ctx->ctdb;
1039
1040         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1041         talloc_free(da_ctx);
1042 }
1043
1044 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1045 {
1046         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1047         struct ctdb_context *ctdb = da_ctx->ctdb;
1048
1049         /* This talloc-steals the packet ->c */
1050         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1051         talloc_free(da_ctx);
1052 }
1053
1054 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1055 {
1056         struct ctdb_deferred_attach_context *da_ctx;
1057
1058         /* call it from the main event loop as soon as the current event 
1059            finishes.
1060          */
1061         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1062                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1063                 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1064         }
1065
1066         return 0;
1067 }
1068
1069 /*
1070   a client has asked to attach a new database
1071  */
1072 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1073                                TDB_DATA *outdata, uint64_t tdb_flags, 
1074                                bool persistent, uint32_t client_id,
1075                                struct ctdb_req_control *c,
1076                                bool *async_reply)
1077 {
1078         const char *db_name = (const char *)indata.dptr;
1079         struct ctdb_db_context *db;
1080         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1081         struct ctdb_client *client = NULL;
1082
1083         if (ctdb->tunable.allow_client_db_attach == 0) {
1084                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1085                                   "AllowClientDBAccess == 0\n", db_name));
1086                 return -1;
1087         }
1088
1089         /* dont allow any local clients to attach while we are in recovery mode
1090          * except for the recovery daemon.
1091          * allow all attach from the network since these are always from remote
1092          * recovery daemons.
1093          */
1094         if (client_id != 0) {
1095                 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1096         }
1097         if (client != NULL) {
1098                 /* If the node is inactive it is not part of the cluster
1099                    and we should not allow clients to attach to any
1100                    databases
1101                 */
1102                 if (node->flags & NODE_FLAGS_INACTIVE) {
1103                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
1104                         return -1;
1105                 }
1106
1107                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
1108                  && client->pid != ctdb->recoverd_pid
1109                  && !ctdb->done_startup) {
1110                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1111
1112                         if (da_ctx == NULL) {
1113                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1114                                 return -1;
1115                         }
1116
1117                         da_ctx->ctdb = ctdb;
1118                         da_ctx->c = talloc_steal(da_ctx, c);
1119                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1120                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1121
1122                         event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1123
1124                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1125                         *async_reply = true;
1126                         return 0;
1127                 }
1128         }
1129
1130         /* the client can optionally pass additional tdb flags, but we
1131            only allow a subset of those on the database in ctdb. Note
1132            that tdb_flags is passed in via the (otherwise unused)
1133            srvid to the attach control */
1134         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1135
1136         /* see if we already have this name */
1137         db = ctdb_db_handle(ctdb, db_name);
1138         if (db) {
1139                 outdata->dptr  = (uint8_t *)&db->db_id;
1140                 outdata->dsize = sizeof(db->db_id);
1141                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1142                 return 0;
1143         }
1144
1145         if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1146                 return -1;
1147         }
1148
1149         db = ctdb_db_handle(ctdb, db_name);
1150         if (!db) {
1151                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1152                 return -1;
1153         }
1154
1155         /* remember the flags the client has specified */
1156         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1157
1158         outdata->dptr  = (uint8_t *)&db->db_id;
1159         outdata->dsize = sizeof(db->db_id);
1160
1161         /* Try to ensure it's locked in mem */
1162         ctdb_lockdown_memory(ctdb);
1163
1164         /* tell all the other nodes about this database */
1165         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1166                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1167                                                 CTDB_CONTROL_DB_ATTACH,
1168                                  0, CTDB_CTRL_FLAG_NOREPLY,
1169                                  indata, NULL, NULL);
1170
1171         /* success */
1172         return 0;
1173 }
1174
1175
1176 /*
1177   attach to all existing persistent databases
1178  */
1179 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1180                                   const char *unhealthy_reason)
1181 {
1182         DIR *d;
1183         struct dirent *de;
1184
1185         /* open the persistent db directory and scan it for files */
1186         d = opendir(ctdb->db_directory_persistent);
1187         if (d == NULL) {
1188                 return 0;
1189         }
1190
1191         while ((de=readdir(d))) {
1192                 char *p, *s, *q;
1193                 size_t len = strlen(de->d_name);
1194                 uint32_t node;
1195                 int invalid_name = 0;
1196                 
1197                 s = talloc_strdup(ctdb, de->d_name);
1198                 CTDB_NO_MEMORY(ctdb, s);
1199
1200                 /* only accept names ending in .tdb */
1201                 p = strstr(s, ".tdb.");
1202                 if (len < 7 || p == NULL) {
1203                         talloc_free(s);
1204                         continue;
1205                 }
1206
1207                 /* only accept names ending with .tdb. and any number of digits */
1208                 q = p+5;
1209                 while (*q != 0 && invalid_name == 0) {
1210                         if (!isdigit(*q++)) {
1211                                 invalid_name = 1;
1212                         }
1213                 }
1214                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1215                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1216                         talloc_free(s);
1217                         continue;
1218                 }
1219                 p[4] = 0;
1220
1221                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1222                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1223                         closedir(d);
1224                         talloc_free(s);
1225                         return -1;
1226                 }
1227
1228                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1229
1230                 talloc_free(s);
1231         }
1232         closedir(d);
1233         return 0;
1234 }
1235
1236 int ctdb_attach_databases(struct ctdb_context *ctdb)
1237 {
1238         int ret;
1239         char *persistent_health_path = NULL;
1240         char *unhealthy_reason = NULL;
1241         bool first_try = true;
1242
1243         if (ctdb->db_directory == NULL) {
1244                 ctdb->db_directory = VARDIR "/ctdb";
1245         }
1246         if (ctdb->db_directory_persistent == NULL) {
1247                 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1248         }
1249         if (ctdb->db_directory_state == NULL) {
1250                 ctdb->db_directory_state = VARDIR "/ctdb/state";
1251         }
1252
1253         /* make sure the db directory exists */
1254         ret = mkdir(ctdb->db_directory, 0700);
1255         if (ret == -1 && errno != EEXIST) {
1256                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1257                          ctdb->db_directory));
1258                 return -1;
1259         }
1260
1261         /* make sure the persistent db directory exists */
1262         ret = mkdir(ctdb->db_directory_persistent, 0700);
1263         if (ret == -1 && errno != EEXIST) {
1264                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1265                          ctdb->db_directory_persistent));
1266                 return -1;
1267         }
1268
1269         /* make sure the internal state db directory exists */
1270         ret = mkdir(ctdb->db_directory_state, 0700);
1271         if (ret == -1 && errno != EEXIST) {
1272                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1273                          ctdb->db_directory_state));
1274                 return -1;
1275         }
1276
1277         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1278                                                  ctdb->db_directory_state,
1279                                                  PERSISTENT_HEALTH_TDB,
1280                                                  ctdb->pnn);
1281         if (persistent_health_path == NULL) {
1282                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1283                 return -1;
1284         }
1285
1286 again:
1287
1288         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1289                                                    0, TDB_DISALLOW_NESTING,
1290                                                    O_CREAT | O_RDWR, 0600);
1291         if (ctdb->db_persistent_health == NULL) {
1292                 struct tdb_wrap *tdb;
1293
1294                 if (!first_try) {
1295                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1296                                           persistent_health_path,
1297                                           errno,
1298                                           strerror(errno)));
1299                         talloc_free(persistent_health_path);
1300                         talloc_free(unhealthy_reason);
1301                         return -1;
1302                 }
1303                 first_try = false;
1304
1305                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1306                                                    persistent_health_path,
1307                                                    "was cleared after a failure",
1308                                                    "manual verification needed");
1309                 if (unhealthy_reason == NULL) {
1310                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1311                         talloc_free(persistent_health_path);
1312                         return -1;
1313                 }
1314
1315                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1316                                   persistent_health_path));
1317                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1318                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1319                                     O_CREAT | O_RDWR, 0600);
1320                 if (tdb) {
1321                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1322                                           persistent_health_path,
1323                                           errno,
1324                                           strerror(errno)));
1325                         talloc_free(persistent_health_path);
1326                         talloc_free(unhealthy_reason);
1327                         return -1;
1328                 }
1329
1330                 talloc_free(tdb);
1331                 goto again;
1332         }
1333         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1334         if (ret != 0) {
1335                 struct tdb_wrap *tdb;
1336
1337                 talloc_free(ctdb->db_persistent_health);
1338                 ctdb->db_persistent_health = NULL;
1339
1340                 if (!first_try) {
1341                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1342                                           persistent_health_path));
1343                         talloc_free(persistent_health_path);
1344                         talloc_free(unhealthy_reason);
1345                         return -1;
1346                 }
1347                 first_try = false;
1348
1349                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1350                                                    persistent_health_path,
1351                                                    "was cleared after a failure",
1352                                                    "manual verification needed");
1353                 if (unhealthy_reason == NULL) {
1354                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1355                         talloc_free(persistent_health_path);
1356                         return -1;
1357                 }
1358
1359                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1360                                   persistent_health_path));
1361                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1362                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1363                                     O_CREAT | O_RDWR, 0600);
1364                 if (tdb) {
1365                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1366                                           persistent_health_path,
1367                                           errno,
1368                                           strerror(errno)));
1369                         talloc_free(persistent_health_path);
1370                         talloc_free(unhealthy_reason);
1371                         return -1;
1372                 }
1373
1374                 talloc_free(tdb);
1375                 goto again;
1376         }
1377         talloc_free(persistent_health_path);
1378
1379         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1380         talloc_free(unhealthy_reason);
1381         if (ret != 0) {
1382                 return ret;
1383         }
1384
1385         return 0;
1386 }
1387
1388 /*
1389   called when a broadcast seqnum update comes in
1390  */
1391 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1392 {
1393         struct ctdb_db_context *ctdb_db;
1394         if (srcnode == ctdb->pnn) {
1395                 /* don't update ourselves! */
1396                 return 0;
1397         }
1398
1399         ctdb_db = find_ctdb_db(ctdb, db_id);
1400         if (!ctdb_db) {
1401                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1402                 return -1;
1403         }
1404
1405         if (ctdb_db->unhealthy_reason) {
1406                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1407                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1408                 return -1;
1409         }
1410
1411         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1412         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1413         return 0;
1414 }
1415
1416 /*
1417   timer to check for seqnum changes in a ltdb and propogate them
1418  */
1419 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1420                                    struct timeval t, void *p)
1421 {
1422         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1423         struct ctdb_context *ctdb = ctdb_db->ctdb;
1424         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1425         if (new_seqnum != ctdb_db->seqnum) {
1426                 /* something has changed - propogate it */
1427                 TDB_DATA data;
1428                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1429                 data.dsize = sizeof(uint32_t);
1430                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1431                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1432                                          data, NULL, NULL);             
1433         }
1434         ctdb_db->seqnum = new_seqnum;
1435
1436         /* setup a new timer */
1437         ctdb_db->seqnum_update =
1438                 event_add_timed(ctdb->ev, ctdb_db, 
1439                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1440                                 ctdb_ltdb_seqnum_check, ctdb_db);
1441 }
1442
1443 /*
1444   enable seqnum handling on this db
1445  */
1446 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1447 {
1448         struct ctdb_db_context *ctdb_db;
1449         ctdb_db = find_ctdb_db(ctdb, db_id);
1450         if (!ctdb_db) {
1451                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1452                 return -1;
1453         }
1454
1455         if (ctdb_db->seqnum_update == NULL) {
1456                 ctdb_db->seqnum_update =
1457                         event_add_timed(ctdb->ev, ctdb_db, 
1458                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1459                                         ctdb_ltdb_seqnum_check, ctdb_db);
1460         }
1461
1462         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1463         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1464         return 0;
1465 }
1466
1467 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1468 {
1469         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1470         struct ctdb_db_context *ctdb_db;
1471
1472         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1473         if (!ctdb_db) {
1474                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1475                 return 0;
1476         }
1477
1478         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1479                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1480                 return 0;
1481         }
1482
1483         ctdb_db->priority = db_prio->priority;
1484         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1485
1486         return 0;
1487 }
1488