libctdb: "unpack_reply_control" does not need the ctdb_connection parameter
[ctdb.git] / libctdb / ctdb.c
1 /*
2    core of libctdb
3
4    Copyright (C) Rusty Russell 2010
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19 #include <ctdb.h>
20 #include <poll.h>
21 #include <errno.h>
22 #include <unistd.h>
23 #include <fcntl.h>
24 #include <stdlib.h>
25 #include <sys/socket.h>
26 #include <sys/un.h>
27 #include <sys/ioctl.h>
28 #include "libctdb_private.h"
29 #include "io_elem.h"
30 #include "local_tdb.h"
31 #include "messages.h"
32 #include <dlinklist.h>
33 #include <ctdb_protocol.h>
34
35 /* Remove type-safety macros. */
36 #undef ctdb_attachdb_send
37 #undef ctdb_readrecordlock_async
38 #undef ctdb_connect
39
40 struct ctdb_lock {
41         struct ctdb_lock *next, *prev;
42
43         struct ctdb_db *ctdb_db;
44         TDB_DATA key;
45
46         /* This will always be set by the time user sees this. */
47         unsigned long held_magic;
48         struct ctdb_ltdb_header *hdr;
49
50         /* For convenience, we stash original callback here. */
51         ctdb_rrl_callback_t callback;
52 };
53
54 struct ctdb_db {
55         struct ctdb_connection *ctdb;
56         bool persistent;
57         uint32_t tdb_flags;
58         uint32_t id;
59         struct tdb_context *tdb;
60
61         ctdb_callback_t callback;
62         void *private_data;
63 };
64
65 static void remove_lock(struct ctdb_connection *ctdb, struct ctdb_lock *lock)
66 {
67         DLIST_REMOVE(ctdb->locks, lock);
68 }
69
70 /* FIXME: for thread safety, need tid info too. */
71 static bool holding_lock(struct ctdb_connection *ctdb)
72 {
73         /* For the moment, you can't ever hold more than 1 lock. */
74         return (ctdb->locks != NULL);
75 }
76
77 static void add_lock(struct ctdb_connection *ctdb, struct ctdb_lock *lock)
78 {
79         DLIST_ADD(ctdb->locks, lock);
80 }
81
82 static void cleanup_locks(struct ctdb_connection *ctdb, struct ctdb_db *db)
83 {
84         struct ctdb_lock *i, *next;
85
86         for (i = ctdb->locks; i; i = next) {
87                 /* Grab next pointer, as release_lock will free i */
88                 next = i->next;
89                 if (i->ctdb_db == db) {
90                         ctdb_release_lock(db, i);
91                 }
92         }
93 }
94
95 /* FIXME: Could be in shared util code with rest of ctdb */
96 static void close_noerr(int fd)
97 {
98         int olderr = errno;
99         close(fd);
100         errno = olderr;
101 }
102
103 /* FIXME: Could be in shared util code with rest of ctdb */
104 static void free_noerr(void *p)
105 {
106         int olderr = errno;
107         free(p);
108         errno = olderr;
109 }
110
111 /* FIXME: Could be in shared util code with rest of ctdb */
112 static void set_nonblocking(int fd)
113 {
114         unsigned v;
115         v = fcntl(fd, F_GETFL, 0);
116         fcntl(fd, F_SETFL, v | O_NONBLOCK);
117 }
118
119 /* FIXME: Could be in shared util code with rest of ctdb */
120 static void set_close_on_exec(int fd)
121 {
122         unsigned v;
123         v = fcntl(fd, F_GETFD, 0);
124         fcntl(fd, F_SETFD, v | FD_CLOEXEC);
125 }
126
127 static void set_pnn(struct ctdb_connection *ctdb,
128                     struct ctdb_request *req,
129                     void *unused)
130 {
131         if (!ctdb_getpnn_recv(ctdb, req, &ctdb->pnn)) {
132                 DEBUG(ctdb, LOG_CRIT,
133                       "ctdb_connect(async): failed to get pnn");
134                 ctdb->broken = true;
135         }
136         ctdb_request_free(req);
137 }
138
139 struct ctdb_connection *ctdb_connect(const char *addr,
140                                      ctdb_log_fn_t log_fn, void *log_priv)
141 {
142         struct ctdb_connection *ctdb;
143         struct sockaddr_un sun;
144
145         ctdb = malloc(sizeof(*ctdb));
146         if (!ctdb) {
147                 /* With no format string, we hope it doesn't use ap! */
148                 va_list ap;
149                 memset(&ap, 0, sizeof(ap));
150                 errno = ENOMEM;
151                 log_fn(log_priv, LOG_ERR, "ctdb_connect: no memory", ap);
152                 goto fail;
153         }
154         ctdb->outq = NULL;
155         ctdb->doneq = NULL;
156         ctdb->in = NULL;
157         ctdb->inqueue = NULL;
158         ctdb->message_handlers = NULL;
159         ctdb->next_id = 0;
160         ctdb->broken = false;
161         ctdb->log = log_fn;
162         ctdb->log_priv = log_priv;
163         ctdb->locks = NULL;
164
165         memset(&sun, 0, sizeof(sun));
166         sun.sun_family = AF_UNIX;
167         if (!addr)
168                 addr = CTDB_PATH;
169         strncpy(sun.sun_path, addr, sizeof(sun.sun_path)-1);
170         ctdb->fd = socket(AF_UNIX, SOCK_STREAM, 0);
171         if (ctdb->fd < 0)
172                 goto free_fail;
173
174         set_nonblocking(ctdb->fd);
175         set_close_on_exec(ctdb->fd);
176
177         if (connect(ctdb->fd, (struct sockaddr *)&sun, sizeof(sun)) == -1)
178                 goto close_fail;
179
180         /* Immediately queue a request to get our pnn. */
181         if (!ctdb_getpnn_send(ctdb, CTDB_CURRENT_NODE, set_pnn, NULL))
182                 goto close_fail;
183
184         return ctdb;
185
186 close_fail:
187         close_noerr(ctdb->fd);
188 free_fail:
189         free_noerr(ctdb);
190 fail:
191         return NULL;
192 }
193
194 void ctdb_disconnect(struct ctdb_connection *ctdb)
195 {
196         struct ctdb_request *i;
197
198         DEBUG(ctdb, LOG_DEBUG, "ctdb_disconnect");
199
200         while ((i = ctdb->outq) != NULL) {
201                 DLIST_REMOVE(ctdb->outq, i);
202                 ctdb_request_free(i);
203         }
204
205         while ((i = ctdb->doneq) != NULL) {
206                 DLIST_REMOVE(ctdb->doneq, i);
207                 ctdb_request_free(i);
208         }
209
210         if (ctdb->in)
211                 free_io_elem(ctdb->in);
212
213         remove_message_handlers(ctdb);
214
215         close(ctdb->fd);
216         /* Just in case they try to reuse */
217         ctdb->fd = -1;
218         free(ctdb);
219 }
220
221 int ctdb_get_fd(struct ctdb_connection *ctdb)
222 {
223         return ctdb->fd;
224 }
225
226 int ctdb_which_events(struct ctdb_connection *ctdb)
227 {
228         int events = POLLIN;
229
230         if (ctdb->outq)
231                 events |= POLLOUT;
232         return events;
233 }
234
235 struct ctdb_request *new_ctdb_request(struct ctdb_connection *ctdb, size_t len,
236                                       ctdb_callback_t cb, void *cbdata)
237 {
238         struct ctdb_request *req = malloc(sizeof(*req));
239         if (!req)
240                 return NULL;
241         req->io = new_io_elem(len);
242         if (!req->io) {
243                 free(req);
244                 return NULL;
245         }
246         req->ctdb = ctdb;
247         req->hdr.hdr = io_elem_data(req->io, NULL);
248         req->reply = NULL;
249         req->callback = cb;
250         req->priv_data = cbdata;
251         req->extra = NULL;
252         req->extra_destructor = NULL;
253         return req;
254 }
255
256 void ctdb_request_free(struct ctdb_request *req)
257 {
258         struct ctdb_connection *ctdb = req->ctdb;
259
260         if (req->next || req->prev) {
261                 DEBUG(ctdb, LOG_ALERT,
262                       "ctdb_request_free: request not complete! ctdb_cancel? %p (id %u)",
263                       req, req->hdr.hdr ? req->hdr.hdr->reqid : 0);
264                 ctdb_cancel(ctdb, req);
265                 return;
266         }
267         if (req->extra_destructor) {
268                 req->extra_destructor(ctdb, req);
269         }
270         if (req->reply) {
271                 free_io_elem(req->reply);
272         }
273         free_io_elem(req->io);
274         free(req);
275 }
276
277 /* Sanity-checking wrapper for reply. */
278 static struct ctdb_reply_call *unpack_reply_call(struct ctdb_request *req,
279                                                  uint32_t callid)
280 {
281         size_t len;
282         struct ctdb_reply_call *inhdr = io_elem_data(req->reply, &len);
283
284         /* Library user error if this isn't a reply to a call. */
285         if (req->hdr.hdr->operation != CTDB_REQ_CALL) {
286                 errno = EINVAL;
287                 DEBUG(req->ctdb, LOG_ALERT,
288                       "This was not a ctdbd call request: operation %u",
289                       req->hdr.hdr->operation);
290                 return NULL;
291         }
292
293         if (req->hdr.call->callid != callid) {
294                 errno = EINVAL;
295                 DEBUG(req->ctdb, LOG_ALERT,
296                       "This was not a ctdbd %u call request: %u",
297                       callid, req->hdr.call->callid);
298                 return NULL;
299         }
300
301         /* ctdbd or our error if this isn't a reply call. */
302         if (len < sizeof(*inhdr) || inhdr->hdr.operation != CTDB_REPLY_CALL) {
303                 errno = EIO;
304                 DEBUG(req->ctdb, LOG_CRIT,
305                       "Invalid ctdbd call reply: len %zu, operation %u",
306                       len, inhdr->hdr.operation);
307                 return NULL;
308         }
309
310         return inhdr;
311 }
312
313 /* Sanity-checking wrapper for reply. */
314 struct ctdb_reply_control *unpack_reply_control(struct ctdb_request *req,
315                                                 enum ctdb_controls control)
316 {
317         size_t len;
318         struct ctdb_reply_control *inhdr = io_elem_data(req->reply, &len);
319
320         /* Library user error if this isn't a reply to a call. */
321         if (len < sizeof(*inhdr)) {
322                 errno = EINVAL;
323                 DEBUG(req->ctdb, LOG_ALERT,
324                       "Short ctdbd control reply: %zu bytes", len);
325                 return NULL;
326         }
327         if (req->hdr.hdr->operation != CTDB_REQ_CONTROL) {
328                 errno = EINVAL;
329                 DEBUG(req->ctdb, LOG_ALERT,
330                       "This was not a ctdbd control request: operation %u",
331                       req->hdr.hdr->operation);
332                 return NULL;
333         }
334
335         /* ... or if it was a different control from what we expected. */
336         if (req->hdr.control->opcode != control) {
337                 errno = EINVAL;
338                 DEBUG(req->ctdb, LOG_ALERT,
339                       "This was not an opcode %u ctdbd control request: %u",
340                       control, req->hdr.control->opcode);
341                 return NULL;
342         }
343
344         /* ctdbd or our error if this isn't a reply call. */
345         if (inhdr->hdr.operation != CTDB_REPLY_CONTROL) {
346                 errno = EIO;
347                 DEBUG(req->ctdb, LOG_CRIT,
348                       "Invalid ctdbd control reply: operation %u",
349                       inhdr->hdr.operation);
350                 return NULL;
351         }
352
353         return inhdr;
354 }
355
356 static void handle_incoming(struct ctdb_connection *ctdb, struct io_elem *in)
357 {
358         struct ctdb_req_header *hdr;
359         size_t len;
360         struct ctdb_request *i;
361
362         hdr = io_elem_data(in, &len);
363         /* FIXME: use len to check packet! */
364
365         if (hdr->operation == CTDB_REQ_MESSAGE) {
366                 deliver_message(ctdb, hdr);
367                 return;
368         }
369
370         for (i = ctdb->doneq; i; i = i->next) {
371                 if (i->hdr.hdr->reqid == hdr->reqid) {
372                         DLIST_REMOVE(ctdb->doneq, i);
373                         i->reply = in;
374                         i->callback(ctdb, i, i->priv_data);
375                         return;
376                 }
377         }
378         DEBUG(ctdb, LOG_WARNING,
379               "Unexpected ctdbd request reply: operation %u reqid %u",
380               hdr->operation, hdr->reqid);
381         free_io_elem(in);
382 }
383
384 /* Remove "harmless" errors. */
385 static ssize_t real_error(ssize_t ret)
386 {
387         if (ret < 0 && (errno == EINTR || errno == EWOULDBLOCK))
388                 return 0;
389         return ret;
390 }
391
392 bool ctdb_service(struct ctdb_connection *ctdb, int revents)
393 {
394         if (ctdb->broken) {
395                 return false;
396         }
397
398         if (holding_lock(ctdb)) {
399                 DEBUG(ctdb, LOG_ALERT, "Do not block while holding lock!");
400         }
401
402         if (revents & POLLOUT) {
403                 while (ctdb->outq) {
404                         if (real_error(write_io_elem(ctdb->fd,
405                                                      ctdb->outq->io)) < 0) {
406                                 DEBUG(ctdb, LOG_ERR,
407                                       "ctdb_service: error writing to ctdbd");
408                                 ctdb->broken = true;
409                                 return false;
410                         }
411                         if (io_elem_finished(ctdb->outq->io)) {
412                                 struct ctdb_request *done = ctdb->outq;
413                                 DLIST_REMOVE(ctdb->outq, done);
414                                 /* We add at the head: any dead ones
415                                  * sit and end. */
416                                 DLIST_ADD(ctdb->doneq, done);
417                         }
418                 }
419         }
420
421         while (revents & POLLIN) {
422                 int ret;
423                 int num_ready = 0;
424
425                 if (ioctl(ctdb->fd, FIONREAD, &num_ready) != 0) {
426                         DEBUG(ctdb, LOG_ERR,
427                               "ctdb_service: ioctl(FIONREAD) %d", errno);
428                         ctdb->broken = true;
429                         return false;
430                 }
431                 if (num_ready == 0) {
432                         /* the descriptor has been closed or we have all our data */
433                         break;
434                 }
435
436
437                 if (!ctdb->in) {
438                         ctdb->in = new_io_elem(sizeof(struct ctdb_req_header));
439                         if (!ctdb->in) {
440                                 DEBUG(ctdb, LOG_ERR,
441                                       "ctdb_service: allocating readbuf");
442                                 ctdb->broken = true;
443                                 return false;
444                         }
445                 }
446
447                 ret = read_io_elem(ctdb->fd, ctdb->in);
448                 if (real_error(ret) < 0 || ret == 0) {
449                         /* They closed fd? */
450                         if (ret == 0)
451                                 errno = EBADF;
452                         DEBUG(ctdb, LOG_ERR,
453                               "ctdb_service: error reading from ctdbd");
454                         ctdb->broken = true;
455                         return false;
456                 } else if (ret < 0) {
457                         /* No progress, stop loop. */
458                         break;
459                 } else if (io_elem_finished(ctdb->in)) {
460                         io_elem_queue(ctdb, ctdb->in);
461                         ctdb->in = NULL;
462                 }
463         }
464
465
466         while (ctdb->inqueue != NULL) {
467                 struct io_elem *io = ctdb->inqueue;
468
469                 io_elem_dequeue(ctdb, io);
470                 handle_incoming(ctdb, io);
471         }
472
473         return true;
474 }
475
476 /* This is inefficient.  We could pull in idtree.c. */
477 static bool reqid_used(const struct ctdb_connection *ctdb, uint32_t reqid)
478 {
479         struct ctdb_request *i;
480
481         for (i = ctdb->outq; i; i = i->next) {
482                 if (i->hdr.hdr->reqid == reqid) {
483                         return true;
484                 }
485         }
486         for (i = ctdb->doneq; i; i = i->next) {
487                 if (i->hdr.hdr->reqid == reqid) {
488                         return true;
489                 }
490         }
491         return false;
492 }
493
494 uint32_t new_reqid(struct ctdb_connection *ctdb)
495 {
496         while (reqid_used(ctdb, ctdb->next_id)) {
497                 ctdb->next_id++;
498         }
499         return ctdb->next_id++;
500 }
501
502 struct ctdb_request *new_ctdb_control_request(struct ctdb_connection *ctdb,
503                                               uint32_t opcode,
504                                               uint32_t destnode,
505                                               const void *extra_data,
506                                               size_t extra,
507                                               ctdb_callback_t callback,
508                                               void *cbdata)
509 {
510         struct ctdb_request *req;
511         struct ctdb_req_control *pkt;
512
513         req = new_ctdb_request(
514                 ctdb, offsetof(struct ctdb_req_control, data) + extra,
515                 callback, cbdata);
516         if (!req)
517                 return NULL;
518
519         io_elem_init_req_header(req->io,
520                                 CTDB_REQ_CONTROL, destnode, new_reqid(ctdb));
521
522         pkt = req->hdr.control;
523         pkt->pad = 0;
524         pkt->opcode = opcode;
525         pkt->srvid = 0;
526         pkt->client_id = 0;
527         pkt->flags = 0;
528         pkt->datalen = extra;
529         memcpy(pkt->data, extra_data, extra);
530         DLIST_ADD(ctdb->outq, req);
531         return req;
532 }
533
534 void ctdb_cancel_callback(struct ctdb_connection *ctdb,
535                           struct ctdb_request *req,
536                           void *unused)
537 {
538         ctdb_request_free(req);
539 }
540
541 void ctdb_cancel(struct ctdb_connection *ctdb, struct ctdb_request *req)
542 {
543         if (!req->next && !req->prev) {
544                 DEBUG(ctdb, LOG_ALERT,
545                       "ctdb_cancel: request completed! ctdb_request_free? %p (id %u)",
546                       req, req->hdr.hdr ? req->hdr.hdr->reqid : 0);
547                 ctdb_request_free(req);
548                 return;
549         }
550
551         DEBUG(ctdb, LOG_DEBUG, "ctdb_cancel: %p (id %u)",
552               req, req->hdr.hdr ? req->hdr.hdr->reqid : 0);
553
554         /* FIXME: If it's not sent, we could just free it right now. */
555         req->callback = ctdb_cancel_callback;
556 }
557
558 void ctdb_detachdb(struct ctdb_connection *ctdb, struct ctdb_db *db)
559 {
560         cleanup_locks(ctdb, db);
561         tdb_close(db->tdb);
562         free(db);
563 }
564
565 static void destroy_req_db(struct ctdb_connection *ctdb,
566                            struct ctdb_request *req);
567 static void attachdb_done(struct ctdb_connection *ctdb,
568                           struct ctdb_request *req,
569                           void *_db);
570 static void attachdb_getdbpath_done(struct ctdb_connection *ctdb,
571                                     struct ctdb_request *req,
572                                     void *_db);
573
574 struct ctdb_request *
575 ctdb_attachdb_send(struct ctdb_connection *ctdb,
576                    const char *name, bool persistent, uint32_t tdb_flags,
577                    ctdb_callback_t callback, void *private_data)
578 {
579         struct ctdb_request *req;
580         struct ctdb_db *db;
581         uint32_t opcode;
582
583         /* FIXME: Search if db already open. */
584         db = malloc(sizeof(*db));
585         if (!db) {
586                 return NULL;
587         }
588
589         if (persistent) {
590                 opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
591         } else {
592                 opcode = CTDB_CONTROL_DB_ATTACH;
593         }
594
595         req = new_ctdb_control_request(ctdb, opcode, CTDB_CURRENT_NODE, name,
596                                        strlen(name) + 1, attachdb_done, db);
597         if (!req) {
598                 DEBUG(ctdb, LOG_ERR,
599                       "ctdb_attachdb_send: failed allocating DB_ATTACH");
600                 free(db);
601                 return NULL;
602         }
603
604         db->ctdb = ctdb;
605         db->tdb_flags = tdb_flags;
606         db->persistent = persistent;
607         db->callback = callback;
608         db->private_data = private_data;
609
610         req->extra_destructor = destroy_req_db;
611         /* This is set non-NULL when we succeed, see ctdb_attachdb_recv */
612         req->extra = NULL;
613
614         /* Flags get overloaded into srvid. */
615         req->hdr.control->srvid = tdb_flags;
616         DEBUG(db->ctdb, LOG_DEBUG,
617               "ctdb_attachdb_send: DB_ATTACH request %p", req);
618         return req;
619 }
620
621 static void destroy_req_db(struct ctdb_connection *ctdb,
622                            struct ctdb_request *req)
623 {
624         /* Incomplete db is in priv_data. */
625         free(req->priv_data);
626         /* second request is chained off this one. */
627         if (req->extra) {
628                 ctdb_request_free(req->extra);
629         }
630 }
631
632 static void attachdb_done(struct ctdb_connection *ctdb,
633                           struct ctdb_request *req,
634                           void *_db)
635 {
636         struct ctdb_db *db = _db;
637         struct ctdb_request *req2;
638         struct ctdb_reply_control *reply;
639         enum ctdb_controls control = CTDB_CONTROL_DB_ATTACH;
640
641         if (db->persistent) {
642                 control = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
643         }
644
645         reply = unpack_reply_control(req, control);
646         if (!reply || reply->status != 0) {
647                 if (reply) {
648                         DEBUG(ctdb, LOG_ERR,
649                               "ctdb_attachdb_send(async): DB_ATTACH status %i",
650                               reply->status);
651                 }
652                 /* We failed.  Hand request to user and have them discover it
653                  * via ctdb_attachdb_recv. */
654                 db->callback(ctdb, req, db->private_data);
655                 return;
656         }
657         db->id = *(uint32_t *)reply->data;
658
659         /* Now we do another call, to get the dbpath. */
660         req2 = new_ctdb_control_request(db->ctdb, CTDB_CONTROL_GETDBPATH,
661                                         CTDB_CURRENT_NODE,
662                                         &db->id, sizeof(db->id),
663                                         attachdb_getdbpath_done, db);
664         if (!req2) {
665                 DEBUG(db->ctdb, LOG_ERR,
666                       "ctdb_attachdb_send(async): failed to allocate");
667                 db->callback(ctdb, req, db->private_data);
668                 return;
669         }
670         req->extra = req2;
671         req2->extra = req;
672         DEBUG(db->ctdb, LOG_DEBUG,
673               "ctdb_attachdb_send(async): created getdbpath request");
674 }
675
676 static void attachdb_getdbpath_done(struct ctdb_connection *ctdb,
677                                     struct ctdb_request *req,
678                                     void *_db)
679 {
680         struct ctdb_db *db = _db;
681
682         /* Do callback on original request. */
683         db->callback(ctdb, req->extra, db->private_data);
684 }
685
686 struct ctdb_db *ctdb_attachdb_recv(struct ctdb_connection *ctdb,
687                                    struct ctdb_request *req)
688 {
689         struct ctdb_request *dbpath_req = req->extra;
690         struct ctdb_reply_control *reply;
691         struct ctdb_db *db = req->priv_data;
692         uint32_t tdb_flags = db->tdb_flags;
693         struct tdb_logging_context log;
694
695         /* Never sent the dbpath request?  We've failed. */
696         if (!dbpath_req) {
697                 /* FIXME: Save errno? */
698                 errno = EINVAL;
699                 return NULL;
700         }
701
702         reply = unpack_reply_control(dbpath_req, CTDB_CONTROL_GETDBPATH);
703         if (!reply) {
704                 return NULL;
705         }
706         if (reply->status != 0) {
707                 DEBUG(db->ctdb, LOG_ERR,
708                       "ctdb_attachdb_recv: reply status %i", reply->status);
709                 return NULL;
710         }
711
712         tdb_flags = db->persistent ? TDB_DEFAULT : TDB_NOSYNC;
713         tdb_flags |= TDB_DISALLOW_NESTING;
714
715         log.log_fn = ctdb_tdb_log_bridge;
716         log.log_private = ctdb;
717         db->tdb = tdb_open_ex((char *)reply->data, 0, tdb_flags, O_RDWR, 0,
718                               &log, NULL);
719         if (db->tdb == NULL) {
720                 DEBUG(db->ctdb, LOG_ERR,
721                       "ctdb_attachdb_recv: failed to tdb_open %s",
722                       (char *)reply->data);
723                 return NULL;
724         }
725
726         /* Finally, separate the db from the request (see destroy_req_db). */
727         req->priv_data = NULL;
728         DEBUG(db->ctdb, LOG_DEBUG,
729               "ctdb_attachdb_recv: db %p, tdb %s", db, (char *)reply->data);
730         return db;
731 }
732
733 static unsigned long lock_magic(struct ctdb_lock *lock)
734 {
735         /* A non-zero magic specific to this structure. */
736         return ((unsigned long)lock->key.dptr
737                 ^ (((unsigned long)lock->key.dptr) << 16)
738                 ^ 0xBADC0FFEEBADC0DEULL)
739                 | 1;
740 }
741
742 /* This is only called on locks before they're held. */
743 static void free_lock(struct ctdb_lock *lock)
744 {
745         if (lock->held_magic) {
746                 DEBUG(lock->ctdb_db->ctdb, LOG_ALERT,
747                       "free_lock invalid lock %p", lock);
748         }
749         free(lock->hdr);
750         free(lock);
751 }
752
753
754 void ctdb_release_lock(struct ctdb_db *ctdb_db, struct ctdb_lock *lock)
755 {
756         if (lock->held_magic != lock_magic(lock)) {
757                 DEBUG(lock->ctdb_db->ctdb, LOG_ALERT,
758                       "ctdb_release_lock invalid lock %p", lock);
759         } else if (lock->ctdb_db != ctdb_db) {
760                 errno = EBADF;
761                 DEBUG(ctdb_db->ctdb, LOG_ALERT,
762                       "ctdb_release_lock: wrong ctdb_db.");
763         } else {
764                 tdb_chainunlock(lock->ctdb_db->tdb, lock->key);
765                 DEBUG(lock->ctdb_db->ctdb, LOG_DEBUG,
766                       "ctdb_release_lock %p", lock);
767                 remove_lock(lock->ctdb_db->ctdb, lock);
768         }
769         lock->held_magic = 0;
770         free_lock(lock);
771 }
772
773
774 /* We keep the lock if local node is the dmaster. */
775 static bool try_readrecordlock(struct ctdb_lock *lock, TDB_DATA *data)
776 {
777         struct ctdb_ltdb_header *hdr;
778
779         if (tdb_chainlock(lock->ctdb_db->tdb, lock->key) != 0) {
780                 DEBUG(lock->ctdb_db->ctdb, LOG_WARNING,
781                       "ctdb_readrecordlock_async: failed to chainlock");
782                 return NULL;
783         }
784
785         hdr = ctdb_local_fetch(lock->ctdb_db->tdb, lock->key, data);
786         if (hdr && hdr->dmaster == lock->ctdb_db->ctdb->pnn) {
787                 DEBUG(lock->ctdb_db->ctdb, LOG_DEBUG,
788                       "ctdb_readrecordlock_async: got local lock");
789                 lock->held_magic = lock_magic(lock);
790                 lock->hdr = hdr;
791                 add_lock(lock->ctdb_db->ctdb, lock);
792                 return true;
793         }
794
795         tdb_chainunlock(lock->ctdb_db->tdb, lock->key);
796         free(hdr);
797         return NULL;
798 }
799
800 /* If they shutdown before we hand them the lock, we free it here. */
801 static void destroy_lock(struct ctdb_connection *ctdb,
802                          struct ctdb_request *req)
803 {
804         free_lock(req->extra);
805 }
806
807 static void readrecordlock_retry(struct ctdb_connection *ctdb,
808                                  struct ctdb_request *req, void *private)
809 {
810         struct ctdb_lock *lock = req->extra;
811         struct ctdb_reply_call *reply;
812         TDB_DATA data;
813
814         /* OK, we've received reply to noop migration */
815         reply = unpack_reply_call(req, CTDB_NULL_FUNC);
816         if (!reply || reply->status != 0) {
817                 if (reply) {
818                         DEBUG(ctdb, LOG_ERR,
819                               "ctdb_readrecordlock_async(async):"
820                               " NULL_FUNC returned %i", reply->status);
821                 }
822                 lock->callback(lock->ctdb_db, NULL, tdb_null, private);
823                 ctdb_request_free(req); /* Also frees lock. */
824                 return;
825         }
826
827         /* Can we get lock now? */
828         if (try_readrecordlock(lock, &data)) {
829                 /* Now it's their responsibility to free lock & request! */
830                 req->extra_destructor = NULL;
831                 lock->callback(lock->ctdb_db, lock, data, private);
832                 ctdb_request_free(req);
833                 return;
834         }
835
836         /* Retransmit the same request again (we lost race). */
837         io_elem_reset(req->io);
838         DLIST_ADD(ctdb->outq, req);
839 }
840
841 bool
842 ctdb_readrecordlock_async(struct ctdb_db *ctdb_db, TDB_DATA key,
843                           ctdb_rrl_callback_t callback, void *cbdata)
844 {
845         struct ctdb_request *req;
846         struct ctdb_lock *lock;
847         TDB_DATA data;
848
849         if (holding_lock(ctdb_db->ctdb)) {
850                 DEBUG(ctdb_db->ctdb, LOG_ALERT,
851                       "ctdb_readrecordlock_async: already holding lock");
852                 return false;
853         }
854
855         /* Setup lock */
856         lock = malloc(sizeof(*lock) + key.dsize);
857         if (!lock) {
858                 DEBUG(ctdb_db->ctdb, LOG_ERR,
859                       "ctdb_readrecordlock_async: lock allocation failed");
860                 return false;
861         }
862         lock->key.dptr = (void *)(lock + 1);
863         memcpy(lock->key.dptr, key.dptr, key.dsize);
864         lock->key.dsize = key.dsize;
865         lock->ctdb_db = ctdb_db;
866         lock->hdr = NULL;
867         lock->held_magic = 0;
868
869         /* Fast path. */
870         if (try_readrecordlock(lock, &data)) {
871                 callback(ctdb_db, lock, data, cbdata);
872                 return true;
873         }
874
875         /* Slow path: create request. */
876         req = new_ctdb_request(
877                 ctdb_db->ctdb,
878                 offsetof(struct ctdb_req_call, data) + key.dsize,
879                 readrecordlock_retry, cbdata);
880         if (!req) {
881                 DEBUG(ctdb_db->ctdb, LOG_ERR,
882                       "ctdb_readrecordlock_async: allocation failed");
883                 free_lock(lock);
884                 return NULL;
885         }
886         req->extra = lock;
887         req->extra_destructor = destroy_lock;
888         /* We store the original callback in the lock, and use our own. */
889         lock->callback = callback;
890
891         io_elem_init_req_header(req->io, CTDB_REQ_CALL, CTDB_CURRENT_NODE,
892                                 new_reqid(ctdb_db->ctdb));
893
894         req->hdr.call->flags = CTDB_IMMEDIATE_MIGRATION;
895         req->hdr.call->db_id = ctdb_db->id;
896         req->hdr.call->callid = CTDB_NULL_FUNC;
897         req->hdr.call->hopcount = 0;
898         req->hdr.call->keylen = key.dsize;
899         req->hdr.call->calldatalen = 0;
900         memcpy(req->hdr.call->data, key.dptr, key.dsize);
901         DLIST_ADD(ctdb_db->ctdb->outq, req);
902         return true;
903 }
904
905 bool ctdb_writerecord(struct ctdb_db *ctdb_db,
906                       struct ctdb_lock *lock, TDB_DATA data)
907 {
908         if (lock->ctdb_db != ctdb_db) {
909                 errno = EBADF;
910                 DEBUG(ctdb_db->ctdb, LOG_ALERT,
911                       "ctdb_writerecord: Can not write, wrong ctdb_db.");
912                 return false;
913         }
914
915         if (lock->held_magic != lock_magic(lock)) {
916                 errno = EBADF;
917                 DEBUG(ctdb_db->ctdb, LOG_ALERT,
918                       "ctdb_writerecord: Can not write. Lock has been released.");
919                 return false;
920         }
921                 
922         if (ctdb_db->persistent) {
923                 errno = EINVAL;
924                 DEBUG(ctdb_db->ctdb, LOG_ALERT,
925                       "ctdb_writerecord: cannot write to persistent db");
926                 return false;
927         }
928
929         switch (ctdb_local_store(ctdb_db->tdb, lock->key, lock->hdr, data)) {
930         case 0:
931                 DEBUG(ctdb_db->ctdb, LOG_DEBUG,
932                       "ctdb_writerecord: optimized away noop write.");
933                 /* fall thru */
934         case 1:
935                 return true;
936
937         default:
938                 switch (errno) {
939                 case ENOMEM:
940                         DEBUG(ctdb_db->ctdb, LOG_CRIT,
941                               "ctdb_writerecord: out of memory.");
942                         break;
943                 case EINVAL:
944                         DEBUG(ctdb_db->ctdb, LOG_ALERT,
945                               "ctdb_writerecord: record changed under lock?");
946                         break;
947                 default: /* TDB already logged. */
948                         break;
949                 }
950                 return false;
951         }
952 }
953
954
955 struct ctdb_traverse_state {
956         struct ctdb_request *handle;
957         struct ctdb_db *ctdb_db;
958         uint64_t srvid;
959
960         ctdb_traverse_callback_t callback;
961         void *cbdata;
962 };
963
964 static void traverse_remhnd_cb(struct ctdb_connection *ctdb,
965                         struct ctdb_request *req, void *private_data)
966 {
967         struct ctdb_traverse_state *state = private_data;
968
969         if (!ctdb_remove_message_handler_recv(ctdb, state->handle)) {
970                 DEBUG(ctdb, LOG_ERR,
971                                 "Failed to remove message handler for"
972                                 " traverse.");
973                 state->callback(state->ctdb_db->ctdb, state->ctdb_db,
974                                 TRAVERSE_STATUS_ERROR,
975                                 tdb_null, tdb_null,
976                                 state->cbdata);
977         }
978         ctdb_request_free(state->handle);
979         state->handle = NULL;
980         free(state);
981 }
982         
983 static void msg_h(struct ctdb_connection *ctdb, uint64_t srvid,
984            TDB_DATA data, void *private_data)
985 {
986         struct ctdb_traverse_state *state = private_data;
987         struct ctdb_db *ctdb_db = state->ctdb_db;
988         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
989         TDB_DATA key;
990
991         if (data.dsize < sizeof(uint32_t) ||
992             d->length != data.dsize) {
993                 DEBUG(ctdb, LOG_ERR,
994                         "Bad data size %u in traverse_handler",
995                         (unsigned)data.dsize);
996                 state->callback(state->ctdb_db->ctdb, state->ctdb_db,
997                                 TRAVERSE_STATUS_ERROR,
998                                 tdb_null, tdb_null,
999                                 state->cbdata);
1000                 state->handle = ctdb_remove_message_handler_send(
1001                                 state->ctdb_db->ctdb, state->srvid,
1002                                 msg_h, state,
1003                                 traverse_remhnd_cb, state);
1004                 return;
1005         }
1006
1007         key.dsize = d->keylen;
1008         key.dptr  = &d->data[0];
1009         data.dsize = d->datalen;
1010         data.dptr = &d->data[d->keylen];
1011
1012         if (key.dsize == 0 && data.dsize == 0) {
1013                 state->callback(state->ctdb_db->ctdb, state->ctdb_db,
1014                                 TRAVERSE_STATUS_FINISHED,
1015                                 tdb_null, tdb_null,
1016                                 state->cbdata);
1017                 state->handle = ctdb_remove_message_handler_send(
1018                                 state->ctdb_db->ctdb, state->srvid,
1019                                 msg_h, state,
1020                                 traverse_remhnd_cb, state);
1021                 return;
1022         }
1023
1024         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1025                 /* empty records are deleted records in ctdb */
1026                 return;
1027         }
1028
1029         data.dsize -= sizeof(struct ctdb_ltdb_header);
1030         data.dptr  += sizeof(struct ctdb_ltdb_header);
1031
1032         if (state->callback(ctdb, ctdb_db,
1033                         TRAVERSE_STATUS_RECORD,
1034                         key, data, state->cbdata) != 0) {
1035                 state->handle = ctdb_remove_message_handler_send(
1036                                 state->ctdb_db->ctdb, state->srvid,
1037                                 msg_h, state,
1038                                 traverse_remhnd_cb, state);
1039                 return;
1040         }
1041 }
1042
1043 static void traverse_start_cb(struct ctdb_connection *ctdb,
1044                         struct ctdb_request *req, void *private_data)
1045 {
1046         struct ctdb_traverse_state *state = private_data;
1047
1048         ctdb_request_free(state->handle);
1049         state->handle = NULL;
1050 }
1051
1052 static void traverse_msghnd_cb(struct ctdb_connection *ctdb,
1053                         struct ctdb_request *req, void *private_data)
1054 {
1055         struct ctdb_traverse_state *state = private_data;
1056         struct ctdb_db *ctdb_db = state->ctdb_db;
1057         struct ctdb_traverse_start t;
1058
1059         if (!ctdb_set_message_handler_recv(ctdb, state->handle)) {
1060                 DEBUG(ctdb, LOG_ERR,
1061                                 "Failed to register message handler for"
1062                                 " traverse.");
1063                 state->callback(state->ctdb_db->ctdb, state->ctdb_db,
1064                                 TRAVERSE_STATUS_ERROR,
1065                                 tdb_null, tdb_null,
1066                                 state->cbdata);
1067                 ctdb_request_free(state->handle);
1068                 state->handle = NULL;
1069                 free(state);
1070                 return;
1071         }
1072         ctdb_request_free(state->handle);
1073         state->handle = NULL;
1074
1075         t.db_id = ctdb_db->id;
1076         t.srvid = state->srvid;
1077         t.reqid = 0;
1078
1079         state->handle = new_ctdb_control_request(ctdb,
1080                                 CTDB_CONTROL_TRAVERSE_START,
1081                                 CTDB_CURRENT_NODE,
1082                                 &t, sizeof(t),
1083                                 traverse_start_cb, state);
1084         if (state->handle == NULL) {
1085                 DEBUG(ctdb, LOG_ERR,
1086                                 "ctdb_traverse_async:"
1087                                 " failed to send traverse_start control");
1088                 state->callback(state->ctdb_db->ctdb, state->ctdb_db,
1089                                 TRAVERSE_STATUS_ERROR,
1090                                 tdb_null, tdb_null,
1091                                 state->cbdata);
1092                 state->handle = ctdb_remove_message_handler_send(
1093                                 state->ctdb_db->ctdb, state->srvid,
1094                                 msg_h, state,
1095                                 traverse_remhnd_cb, state);
1096                 return;
1097         }
1098 }
1099
1100 bool ctdb_traverse_async(struct ctdb_db *ctdb_db,
1101                          ctdb_traverse_callback_t callback, void *cbdata)
1102 {
1103         struct ctdb_connection *ctdb = ctdb_db->ctdb;
1104         struct ctdb_traverse_state *state;
1105         static uint32_t tid = 0;
1106
1107         state = malloc(sizeof(struct ctdb_traverse_state));
1108         if (state == NULL) {
1109                 DEBUG(ctdb, LOG_ERR,
1110                                 "ctdb_traverse_async: no memory."
1111                                 " allocate state failed");
1112                 return false;
1113         }
1114
1115         tid++;
1116         state->srvid = CTDB_SRVID_TRAVERSE_RANGE|tid;
1117
1118         state->callback = callback;
1119         state->cbdata   = cbdata;
1120         state->ctdb_db  = ctdb_db;
1121
1122         state->handle = ctdb_set_message_handler_send(ctdb_db->ctdb,
1123                                 state->srvid,
1124                                 msg_h, state,
1125                                 traverse_msghnd_cb, state);
1126         if (state->handle == NULL) {
1127                 DEBUG(ctdb, LOG_ERR,
1128                         "ctdb_traverse_async:"
1129                         " failed ctdb_set_message_handler_send");
1130                 free(state);
1131                 return false;
1132         }
1133
1134         return true;
1135 }