server: add CTDB_CONTROL_DB_SET_HEALTHY and CTDB_CONTROL_DB_GET_HEALTH
[sahlberg/ctdb.git] / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "db_wrap.h"
29 #include "lib/util/dlinklist.h"
30 #include <ctype.h>
31
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
33
34 /*
35   this is the dummy null procedure that all databases support
36 */
37 static int ctdb_null_func(struct ctdb_call_info *call)
38 {
39         return 0;
40 }
41
42 /*
43   this is a plain fetch procedure that all databases support
44 */
45 static int ctdb_fetch_func(struct ctdb_call_info *call)
46 {
47         call->reply_data = &call->record_data;
48         return 0;
49 }
50
51
52
53 struct lock_fetch_state {
54         struct ctdb_context *ctdb;
55         void (*recv_pkt)(void *, struct ctdb_req_header *);
56         void *recv_context;
57         struct ctdb_req_header *hdr;
58         uint32_t generation;
59         bool ignore_generation;
60 };
61
62 /*
63   called when we should retry the operation
64  */
65 static void lock_fetch_callback(void *p)
66 {
67         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
68         if (!state->ignore_generation &&
69             state->generation != state->ctdb->vnn_map->generation) {
70                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
71                 talloc_free(state->hdr);
72                 return;
73         }
74         state->recv_pkt(state->recv_context, state->hdr);
75         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
76 }
77
78
79 /*
80   do a non-blocking ltdb_lock, deferring this ctdb request until we
81   have the chainlock
82
83   It does the following:
84
85    1) tries to get the chainlock. If it succeeds, then it returns 0
86
87    2) if it fails to get a chainlock immediately then it sets up a
88    non-blocking chainlock via ctdb_lockwait, and when it gets the
89    chainlock it re-submits this ctdb request to the main packet
90    receive function
91
92    This effectively queues all ctdb requests that cannot be
93    immediately satisfied until it can get the lock. This means that
94    the main ctdb daemon will not block waiting for a chainlock held by
95    a client
96
97    There are 3 possible return values:
98
99        0:    means that it got the lock immediately.
100       -1:    means that it failed to get the lock, and won't retry
101       -2:    means that it failed to get the lock immediately, but will retry
102  */
103 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
104                            TDB_DATA key, struct ctdb_req_header *hdr,
105                            void (*recv_pkt)(void *, struct ctdb_req_header *),
106                            void *recv_context, bool ignore_generation)
107 {
108         int ret;
109         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
110         struct lockwait_handle *h;
111         struct lock_fetch_state *state;
112         
113         ret = tdb_chainlock_nonblock(tdb, key);
114
115         if (ret != 0 &&
116             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
117                 /* a hard failure - don't try again */
118                 return -1;
119         }
120
121         /* when torturing, ensure we test the contended path */
122         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
123             random() % 5 == 0) {
124                 ret = -1;
125                 tdb_chainunlock(tdb, key);
126         }
127
128         /* first the non-contended path */
129         if (ret == 0) {
130                 return 0;
131         }
132
133         state = talloc(hdr, struct lock_fetch_state);
134         state->ctdb = ctdb_db->ctdb;
135         state->hdr = hdr;
136         state->recv_pkt = recv_pkt;
137         state->recv_context = recv_context;
138         state->generation = ctdb_db->ctdb->vnn_map->generation;
139         state->ignore_generation = ignore_generation;
140
141         /* now the contended path */
142         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
143         if (h == NULL) {
144                 tdb_chainunlock(tdb, key);
145                 return -1;
146         }
147
148         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
149            so it won't be freed yet */
150         talloc_steal(state, hdr);
151         talloc_steal(state, h);
152
153         /* now tell the caller than we will retry asynchronously */
154         return -2;
155 }
156
157 /*
158   a varient of ctdb_ltdb_lock_requeue that also fetches the record
159  */
160 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
161                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
162                                  struct ctdb_req_header *hdr, TDB_DATA *data,
163                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
164                                  void *recv_context, bool ignore_generation)
165 {
166         int ret;
167
168         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
169                                      recv_context, ignore_generation);
170         if (ret == 0) {
171                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
172                 if (ret != 0) {
173                         ctdb_ltdb_unlock(ctdb_db, key);
174                 }
175         }
176         return ret;
177 }
178
179
180 /*
181   paraoid check to see if the db is empty
182  */
183 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
184 {
185         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
186         int count = tdb_traverse_read(tdb, NULL, NULL);
187         if (count != 0) {
188                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
189                          ctdb_db->db_path));
190                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
191         }
192 }
193
194 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
195                                 struct ctdb_db_context *ctdb_db)
196 {
197         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
198         char *old;
199         char *reason = NULL;
200         TDB_DATA key;
201         TDB_DATA val;
202
203         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
204         key.dsize = strlen(ctdb_db->db_name);
205
206         old = ctdb_db->unhealthy_reason;
207         ctdb_db->unhealthy_reason = NULL;
208
209         val = tdb_fetch(tdb, key);
210         if (val.dsize > 0) {
211                 reason = talloc_strndup(ctdb_db,
212                                         (const char *)val.dptr,
213                                         val.dsize);
214                 if (reason == NULL) {
215                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
216                                            (int)val.dsize));
217                         ctdb_db->unhealthy_reason = old;
218                         free(val.dptr);
219                         return -1;
220                 }
221         }
222
223         if (val.dptr) {
224                 free(val.dptr);
225         }
226
227         talloc_free(old);
228         ctdb_db->unhealthy_reason = reason;
229         return 0;
230 }
231
232 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
233                                   struct ctdb_db_context *ctdb_db,
234                                   const char *given_reason,/* NULL means healthy */
235                                   int num_healthy_nodes)
236 {
237         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
238         int ret;
239         TDB_DATA key;
240         TDB_DATA val;
241         char *new_reason = NULL;
242         char *old_reason = NULL;
243
244         ret = tdb_transaction_start(tdb);
245         if (ret != 0) {
246                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
247                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
248                 return -1;
249         }
250
251         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
252         if (ret != 0) {
253                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
254                                    ctdb_db->db_name, ret));
255                 return -1;
256         }
257         old_reason = ctdb_db->unhealthy_reason;
258
259         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
260         key.dsize = strlen(ctdb_db->db_name);
261
262         if (given_reason) {
263                 new_reason = talloc_strdup(ctdb_db, given_reason);
264                 if (new_reason == NULL) {
265                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
266                                           given_reason));
267                         return -1;
268                 }
269         } else if (old_reason && num_healthy_nodes == 0) {
270                 /*
271                  * If the reason indicates ok, but there where no healthy nodes
272                  * available, that it means, we have not recovered valid content
273                  * of the db. So if there's an old reason, prefix it with
274                  * "NO-HEALTHY-NODES - "
275                  */
276                 const char *prefix;
277
278 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
279                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
280                 if (ret != 0) {
281                         prefix = _TMP_PREFIX;
282                 } else {
283                         prefix = "";
284                 }
285                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
286                                          prefix, old_reason);
287                 if (new_reason == NULL) {
288                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
289                                           prefix, old_reason));
290                         return -1;
291                 }
292 #undef _TMP_PREFIX
293         }
294
295         if (new_reason) {
296                 val.dptr = discard_const_p(uint8_t, new_reason);
297                 val.dsize = strlen(new_reason);
298
299                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
300                 if (ret != 0) {
301                         tdb_transaction_cancel(tdb);
302                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
303                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
304                                            ret, tdb_errorstr(tdb)));
305                         talloc_free(new_reason);
306                         return -1;
307                 }
308                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
309                                    ctdb_db->db_name, new_reason));
310         } else if (old_reason) {
311                 ret = tdb_delete(tdb, key);
312                 if (ret != 0) {
313                         tdb_transaction_cancel(tdb);
314                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
315                                            tdb_name(tdb), ctdb_db->db_name,
316                                            ret, tdb_errorstr(tdb)));
317                         talloc_free(new_reason);
318                         return -1;
319                 }
320                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
321                                    ctdb_db->db_name));
322         }
323
324         ret = tdb_transaction_commit(tdb);
325         if (ret != TDB_SUCCESS) {
326                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
327                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
328                 talloc_free(new_reason);
329                 return -1;
330         }
331
332         talloc_free(old_reason);
333         ctdb_db->unhealthy_reason = new_reason;
334
335         return 0;
336 }
337
338 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
339                                      struct ctdb_db_context *ctdb_db)
340 {
341         time_t now = time(NULL);
342         char *new_path;
343         char *new_reason;
344         int ret;
345         struct tm *tm;
346
347         tm = gmtime(&now);
348
349         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
350         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
351                                    "%04u%02u%02u%02u%02u%02u.0Z",
352                                    ctdb_db->db_path,
353                                    tm->tm_year+1900, tm->tm_mon+1,
354                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
355                                    tm->tm_sec);
356         if (new_path == NULL) {
357                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
358                 return -1;
359         }
360
361         new_reason = talloc_asprintf(ctdb_db,
362                                      "ERROR - Backup of corrupted TDB in '%s'",
363                                      new_path);
364         if (new_reason == NULL) {
365                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
366                 return -1;
367         }
368         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
369         talloc_free(new_reason);
370         if (ret != 0) {
371                 DEBUG(DEBUG_CRIT,(__location__
372                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
373                                  ctdb_db->db_path));
374                 return -1;
375         }
376
377         ret = rename(ctdb_db->db_path, new_path);
378         if (ret != 0) {
379                 DEBUG(DEBUG_CRIT,(__location__
380                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
381                                   ctdb_db->db_path, new_path,
382                                   errno, strerror(errno)));
383                 talloc_free(new_path);
384                 return -1;
385         }
386
387         DEBUG(DEBUG_CRIT,(__location__
388                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
389                          ctdb_db->db_path, new_path));
390         talloc_free(new_path);
391         return 0;
392 }
393
394 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
395 {
396         struct ctdb_db_context *ctdb_db;
397         int ret;
398         int ok = 0;
399         int fail = 0;
400
401         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
402                 if (!ctdb_db->persistent) {
403                         continue;
404                 }
405
406                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
407                 if (ret != 0) {
408                         DEBUG(DEBUG_ALERT,(__location__
409                                            " load persistent health for '%s' failed\n",
410                                            ctdb_db->db_path));
411                         return -1;
412                 }
413
414                 if (ctdb_db->unhealthy_reason == NULL) {
415                         ok++;
416                         DEBUG(DEBUG_INFO,(__location__
417                                    " persistent db '%s' healthy\n",
418                                    ctdb_db->db_path));
419                         continue;
420                 }
421
422                 fail++;
423                 DEBUG(DEBUG_ALERT,(__location__
424                                    " persistent db '%s' unhealthy: %s\n",
425                                    ctdb_db->db_path,
426                                    ctdb_db->unhealthy_reason));
427         }
428         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
429               ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
430                ok, fail));
431
432         if (fail != 0) {
433                 return -1;
434         }
435
436         return 0;
437 }
438
439
440 /*
441   mark a database - as healthy
442  */
443 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
444 {
445         uint32_t db_id = *(uint32_t *)indata.dptr;
446         struct ctdb_db_context *ctdb_db;
447         int ret;
448         bool may_recover = false;
449
450         ctdb_db = find_ctdb_db(ctdb, db_id);
451         if (!ctdb_db) {
452                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
453                 return -1;
454         }
455
456         if (ctdb_db->unhealthy_reason) {
457                 may_recover = true;
458         }
459
460         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
461         if (ret != 0) {
462                 DEBUG(DEBUG_ERR,(__location__
463                                  " ctdb_update_persistent_health(%s) failed\n",
464                                  ctdb_db->db_name));
465                 return -1;
466         }
467
468         if (may_recover && !ctdb->done_startup) {
469                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
470                                   ctdb_db->db_name));
471                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
472         }
473
474         return 0;
475 }
476
477 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
478                                    TDB_DATA indata,
479                                    TDB_DATA *outdata)
480 {
481         uint32_t db_id = *(uint32_t *)indata.dptr;
482         struct ctdb_db_context *ctdb_db;
483         int ret;
484
485         ctdb_db = find_ctdb_db(ctdb, db_id);
486         if (!ctdb_db) {
487                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
488                 return -1;
489         }
490
491         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
492         if (ret != 0) {
493                 DEBUG(DEBUG_ERR,(__location__
494                                  " ctdb_load_persistent_health(%s) failed\n",
495                                  ctdb_db->db_name));
496                 return -1;
497         }
498
499         *outdata = tdb_null;
500         if (ctdb_db->unhealthy_reason) {
501                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
502                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
503         }
504
505         return 0;
506 }
507
508 /*
509   attach to a database, handling both persistent and non-persistent databases
510   return 0 on success, -1 on failure
511  */
512 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
513                              bool persistent, const char *unhealthy_reason)
514 {
515         struct ctdb_db_context *ctdb_db, *tmp_db;
516         int ret;
517         struct TDB_DATA key;
518         unsigned tdb_flags;
519         int mode = 0600;
520         int remaining_tries = 0;
521
522         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
523         CTDB_NO_MEMORY(ctdb, ctdb_db);
524
525         ctdb_db->priority = 1;
526         ctdb_db->ctdb = ctdb;
527         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
528         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
529
530         key.dsize = strlen(db_name)+1;
531         key.dptr  = discard_const(db_name);
532         ctdb_db->db_id = ctdb_hash(&key);
533         ctdb_db->persistent = persistent;
534
535         /* check for hash collisions */
536         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
537                 if (tmp_db->db_id == ctdb_db->db_id) {
538                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
539                                  tmp_db->db_id, db_name, tmp_db->db_name));
540                         talloc_free(ctdb_db);
541                         return -1;
542                 }
543         }
544
545         if (persistent) {
546                 if (unhealthy_reason) {
547                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
548                                                             unhealthy_reason, 0);
549                         if (ret != 0) {
550                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
551                                                    ctdb_db->db_name, unhealthy_reason, ret));
552                                 talloc_free(ctdb_db);
553                                 return -1;
554                         }
555                 }
556
557                 if (ctdb->max_persistent_check_errors > 0) {
558                         remaining_tries = 1;
559                 }
560                 if (ctdb->done_startup) {
561                         remaining_tries = 0;
562                 }
563
564                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
565                 if (ret != 0) {
566                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
567                                    ctdb_db->db_name, ret));
568                         talloc_free(ctdb_db);
569                         return -1;
570                 }
571         }
572
573         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
574                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
575                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
576                 talloc_free(ctdb_db);
577                 return -1;
578         }
579
580         if (ctdb_db->unhealthy_reason) {
581                 /* this is just a warning, but we want that in the log file! */
582                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
583                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
584         }
585
586         /* open the database */
587         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
588                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
589                                            db_name, ctdb->pnn);
590
591         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
592         if (!ctdb->do_setsched) {
593                 tdb_flags |= TDB_NOMMAP;
594         }
595         tdb_flags |= TDB_DISALLOW_NESTING;
596
597 again:
598         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
599                                       ctdb->tunable.database_hash_size, 
600                                       tdb_flags, 
601                                       O_CREAT|O_RDWR, mode);
602         if (ctdb_db->ltdb == NULL) {
603                 struct stat st;
604                 int saved_errno = errno;
605
606                 if (!persistent) {
607                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
608                                           ctdb_db->db_path,
609                                           saved_errno,
610                                           strerror(saved_errno)));
611                         talloc_free(ctdb_db);
612                         return -1;
613                 }
614
615                 if (remaining_tries == 0) {
616                         DEBUG(DEBUG_CRIT,(__location__
617                                           "Failed to open persistent tdb '%s': %d - %s\n",
618                                           ctdb_db->db_path,
619                                           saved_errno,
620                                           strerror(saved_errno)));
621                         talloc_free(ctdb_db);
622                         return -1;
623                 }
624
625                 ret = stat(ctdb_db->db_path, &st);
626                 if (ret != 0) {
627                         DEBUG(DEBUG_CRIT,(__location__
628                                           "Failed to open persistent tdb '%s': %d - %s\n",
629                                           ctdb_db->db_path,
630                                           saved_errno,
631                                           strerror(saved_errno)));
632                         talloc_free(ctdb_db);
633                         return -1;
634                 }
635
636                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
637                 if (ret != 0) {
638                         DEBUG(DEBUG_CRIT,(__location__
639                                           "Failed to open persistent tdb '%s': %d - %s\n",
640                                           ctdb_db->db_path,
641                                           saved_errno,
642                                           strerror(saved_errno)));
643                         talloc_free(ctdb_db);
644                         return -1;
645                 }
646
647                 remaining_tries--;
648                 mode = st.st_mode;
649                 goto again;
650         }
651
652         if (!persistent) {
653                 ctdb_check_db_empty(ctdb_db);
654         } else {
655                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
656                 if (ret != 0) {
657                         int fd;
658                         struct stat st;
659
660                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
661                                           ctdb_db->db_path, ret,
662                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
663                         if (remaining_tries == 0) {
664                                 talloc_free(ctdb_db);
665                                 return -1;
666                         }
667
668                         fd = tdb_fd(ctdb_db->ltdb->tdb);
669                         ret = fstat(fd, &st);
670                         if (ret != 0) {
671                                 DEBUG(DEBUG_CRIT,(__location__
672                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
673                                                   ctdb_db->db_path,
674                                                   errno,
675                                                   strerror(errno)));
676                                 talloc_free(ctdb_db);
677                                 return -1;
678                         }
679
680                         /* close the TDB */
681                         talloc_free(ctdb_db->ltdb);
682                         ctdb_db->ltdb = NULL;
683
684                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
685                         if (ret != 0) {
686                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
687                                                   ctdb_db->db_path));
688                                 talloc_free(ctdb_db);
689                                 return -1;
690                         }
691
692                         remaining_tries--;
693                         mode = st.st_mode;
694                         goto again;
695                 }
696         }
697
698         DLIST_ADD(ctdb->db_list, ctdb_db);
699
700         /* setting this can help some high churn databases */
701         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
702
703         /* 
704            all databases support the "null" function. we need this in
705            order to do forced migration of records
706         */
707         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
708         if (ret != 0) {
709                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
710                 talloc_free(ctdb_db);
711                 return -1;
712         }
713
714         /* 
715            all databases support the "fetch" function. we need this
716            for efficient Samba3 ctdb fetch
717         */
718         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
719         if (ret != 0) {
720                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
721                 talloc_free(ctdb_db);
722                 return -1;
723         }
724
725         ret = ctdb_vacuum_init(ctdb_db);
726         if (ret != 0) {
727                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
728                                   "database '%s'\n", ctdb_db->db_name));
729                 talloc_free(ctdb_db);
730                 return -1;
731         }
732
733
734         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
735         
736         /* success */
737         return 0;
738 }
739
740
741 /*
742   a client has asked to attach a new database
743  */
744 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
745                                TDB_DATA *outdata, uint64_t tdb_flags, 
746                                bool persistent)
747 {
748         const char *db_name = (const char *)indata.dptr;
749         struct ctdb_db_context *db;
750         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
751
752         /* the client can optionally pass additional tdb flags, but we
753            only allow a subset of those on the database in ctdb. Note
754            that tdb_flags is passed in via the (otherwise unused)
755            srvid to the attach control */
756         tdb_flags &= TDB_NOSYNC;
757
758         /* If the node is inactive it is not part of the cluster
759            and we should not allow clients to attach to any
760            databases
761         */
762         if (node->flags & NODE_FLAGS_INACTIVE) {
763                 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
764                 return -1;
765         }
766
767
768         /* see if we already have this name */
769         db = ctdb_db_handle(ctdb, db_name);
770         if (db) {
771                 outdata->dptr  = (uint8_t *)&db->db_id;
772                 outdata->dsize = sizeof(db->db_id);
773                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
774                 return 0;
775         }
776
777         if (ctdb_local_attach(ctdb, db_name, persistent, NULL) != 0) {
778                 return -1;
779         }
780
781         db = ctdb_db_handle(ctdb, db_name);
782         if (!db) {
783                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
784                 return -1;
785         }
786
787         /* remember the flags the client has specified */
788         tdb_add_flags(db->ltdb->tdb, tdb_flags);
789
790         outdata->dptr  = (uint8_t *)&db->db_id;
791         outdata->dsize = sizeof(db->db_id);
792
793         /* tell all the other nodes about this database */
794         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
795                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
796                                                 CTDB_CONTROL_DB_ATTACH,
797                                  0, CTDB_CTRL_FLAG_NOREPLY,
798                                  indata, NULL, NULL);
799
800         /* success */
801         return 0;
802 }
803
804
805 /*
806   attach to all existing persistent databases
807  */
808 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
809                                   const char *unhealthy_reason)
810 {
811         DIR *d;
812         struct dirent *de;
813
814         /* open the persistent db directory and scan it for files */
815         d = opendir(ctdb->db_directory_persistent);
816         if (d == NULL) {
817                 return 0;
818         }
819
820         while ((de=readdir(d))) {
821                 char *p, *s, *q;
822                 size_t len = strlen(de->d_name);
823                 uint32_t node;
824                 int invalid_name = 0;
825                 
826                 s = talloc_strdup(ctdb, de->d_name);
827                 CTDB_NO_MEMORY(ctdb, s);
828
829                 /* only accept names ending in .tdb */
830                 p = strstr(s, ".tdb.");
831                 if (len < 7 || p == NULL) {
832                         talloc_free(s);
833                         continue;
834                 }
835
836                 /* only accept names ending with .tdb. and any number of digits */
837                 q = p+5;
838                 while (*q != 0 && invalid_name == 0) {
839                         if (!isdigit(*q++)) {
840                                 invalid_name = 1;
841                         }
842                 }
843                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
844                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
845                         talloc_free(s);
846                         continue;
847                 }
848                 p[4] = 0;
849
850                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason) != 0) {
851                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
852                         closedir(d);
853                         talloc_free(s);
854                         return -1;
855                 }
856
857                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
858
859                 talloc_free(s);
860         }
861         closedir(d);
862         return 0;
863 }
864
865 int ctdb_attach_databases(struct ctdb_context *ctdb)
866 {
867         int ret;
868         char *persistent_health_path = NULL;
869         char *unhealthy_reason = NULL;
870         bool first_try = true;
871
872         if (ctdb->db_directory == NULL) {
873                 ctdb->db_directory = VARDIR "/ctdb";
874         }
875         if (ctdb->db_directory_persistent == NULL) {
876                 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
877         }
878         if (ctdb->db_directory_state == NULL) {
879                 ctdb->db_directory_state = VARDIR "/ctdb/state";
880         }
881
882         /* make sure the db directory exists */
883         ret = mkdir(ctdb->db_directory, 0700);
884         if (ret == -1 && errno != EEXIST) {
885                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
886                          ctdb->db_directory));
887                 return -1;
888         }
889
890         /* make sure the persistent db directory exists */
891         ret = mkdir(ctdb->db_directory_persistent, 0700);
892         if (ret == -1 && errno != EEXIST) {
893                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
894                          ctdb->db_directory_persistent));
895                 return -1;
896         }
897
898         /* make sure the internal state db directory exists */
899         ret = mkdir(ctdb->db_directory_state, 0700);
900         if (ret == -1 && errno != EEXIST) {
901                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
902                          ctdb->db_directory_state));
903                 return -1;
904         }
905
906         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
907                                                  ctdb->db_directory_state,
908                                                  PERSISTENT_HEALTH_TDB,
909                                                  ctdb->pnn);
910         if (persistent_health_path == NULL) {
911                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
912                 return -1;
913         }
914
915 again:
916
917         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
918                                                    0, TDB_DISALLOW_NESTING,
919                                                    O_CREAT | O_RDWR, 0600);
920         if (ctdb->db_persistent_health == NULL) {
921                 struct tdb_wrap *tdb;
922
923                 if (!first_try) {
924                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
925                                           persistent_health_path,
926                                           errno,
927                                           strerror(errno)));
928                         talloc_free(persistent_health_path);
929                         talloc_free(unhealthy_reason);
930                         return -1;
931                 }
932                 first_try = false;
933
934                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
935                                                    persistent_health_path,
936                                                    "was cleared after a failure",
937                                                    "manual verification needed");
938                 if (unhealthy_reason == NULL) {
939                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
940                         talloc_free(persistent_health_path);
941                         return -1;
942                 }
943
944                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
945                                   persistent_health_path));
946                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
947                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
948                                     O_CREAT | O_RDWR, 0600);
949                 if (tdb) {
950                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
951                                           persistent_health_path,
952                                           errno,
953                                           strerror(errno)));
954                         talloc_free(persistent_health_path);
955                         talloc_free(unhealthy_reason);
956                         return -1;
957                 }
958
959                 talloc_free(tdb);
960                 goto again;
961         }
962         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
963         if (ret != 0) {
964                 struct tdb_wrap *tdb;
965
966                 talloc_free(ctdb->db_persistent_health);
967                 ctdb->db_persistent_health = NULL;
968
969                 if (!first_try) {
970                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
971                                           persistent_health_path));
972                         talloc_free(persistent_health_path);
973                         talloc_free(unhealthy_reason);
974                         return -1;
975                 }
976                 first_try = false;
977
978                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
979                                                    persistent_health_path,
980                                                    "was cleared after a failure",
981                                                    "manual verification needed");
982                 if (unhealthy_reason == NULL) {
983                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
984                         talloc_free(persistent_health_path);
985                         return -1;
986                 }
987
988                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
989                                   persistent_health_path));
990                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
991                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
992                                     O_CREAT | O_RDWR, 0600);
993                 if (tdb) {
994                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
995                                           persistent_health_path,
996                                           errno,
997                                           strerror(errno)));
998                         talloc_free(persistent_health_path);
999                         talloc_free(unhealthy_reason);
1000                         return -1;
1001                 }
1002
1003                 talloc_free(tdb);
1004                 goto again;
1005         }
1006         talloc_free(persistent_health_path);
1007
1008         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1009         talloc_free(unhealthy_reason);
1010         if (ret != 0) {
1011                 return ret;
1012         }
1013
1014         return 0;
1015 }
1016
1017 /*
1018   called when a broadcast seqnum update comes in
1019  */
1020 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1021 {
1022         struct ctdb_db_context *ctdb_db;
1023         if (srcnode == ctdb->pnn) {
1024                 /* don't update ourselves! */
1025                 return 0;
1026         }
1027
1028         ctdb_db = find_ctdb_db(ctdb, db_id);
1029         if (!ctdb_db) {
1030                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1031                 return -1;
1032         }
1033
1034         if (ctdb_db->unhealthy_reason) {
1035                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1036                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1037                 return -1;
1038         }
1039
1040         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1041         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1042         return 0;
1043 }
1044
1045 /*
1046   timer to check for seqnum changes in a ltdb and propogate them
1047  */
1048 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1049                                    struct timeval t, void *p)
1050 {
1051         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1052         struct ctdb_context *ctdb = ctdb_db->ctdb;
1053         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1054         if (new_seqnum != ctdb_db->seqnum) {
1055                 /* something has changed - propogate it */
1056                 TDB_DATA data;
1057                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1058                 data.dsize = sizeof(uint32_t);
1059                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1060                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1061                                          data, NULL, NULL);             
1062         }
1063         ctdb_db->seqnum = new_seqnum;
1064
1065         /* setup a new timer */
1066         ctdb_db->seqnum_update =
1067                 event_add_timed(ctdb->ev, ctdb_db, 
1068                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1069                                 ctdb_ltdb_seqnum_check, ctdb_db);
1070 }
1071
1072 /*
1073   enable seqnum handling on this db
1074  */
1075 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1076 {
1077         struct ctdb_db_context *ctdb_db;
1078         ctdb_db = find_ctdb_db(ctdb, db_id);
1079         if (!ctdb_db) {
1080                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1081                 return -1;
1082         }
1083
1084         if (ctdb_db->seqnum_update == NULL) {
1085                 ctdb_db->seqnum_update =
1086                         event_add_timed(ctdb->ev, ctdb_db, 
1087                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1088                                         ctdb_ltdb_seqnum_check, ctdb_db);
1089         }
1090
1091         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1092         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1093         return 0;
1094 }
1095
1096 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1097 {
1098         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1099         struct ctdb_db_context *ctdb_db;
1100
1101         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1102         if (!ctdb_db) {
1103                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1104                 return -1;
1105         }
1106
1107         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1108                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1109                 return -1;
1110         }
1111
1112         ctdb_db->priority = db_prio->priority;
1113         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1114
1115         return 0;
1116 }
1117
1118