ctdb-recovery-helper: Deregister message handler in error paths
[vlendec/samba-autobuild/.git] / ctdb / client / client_db.c
1 /*
2    CTDB client code
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27
28 #include "common/logging.h"
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
39
40 static struct ctdb_db_context *client_db_handle(
41                                         struct ctdb_client_context *client,
42                                         const char *db_name)
43 {
44         struct ctdb_db_context *db;
45
46         for (db = client->db; db != NULL; db = db->next) {
47                 if (strcmp(db_name, db->db_name) == 0) {
48                         return db;
49                 }
50         }
51
52         return NULL;
53 }
54
55 static bool ctdb_db_persistent(struct ctdb_db_context *db)
56 {
57         if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT) {
58                 return true;
59         }
60         return false;
61 }
62
63 static bool ctdb_db_replicated(struct ctdb_db_context *db)
64 {
65         if (db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
66                 return true;
67         }
68         return false;
69 }
70
71 static bool ctdb_db_volatile(struct ctdb_db_context *db)
72 {
73         if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT ||
74             db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
75                 return false;
76         }
77         return true;
78 }
79
80 struct ctdb_set_db_flags_state {
81         struct tevent_context *ev;
82         struct ctdb_client_context *client;
83         struct timeval timeout;
84         uint32_t db_id;
85         uint8_t db_flags;
86         bool readonly_done, sticky_done;
87         uint32_t *pnn_list;
88         int count;
89 };
90
91 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
92 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
93 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
94
95 static struct tevent_req *ctdb_set_db_flags_send(
96                                 TALLOC_CTX *mem_ctx,
97                                 struct tevent_context *ev,
98                                 struct ctdb_client_context *client,
99                                 uint32_t destnode, struct timeval timeout,
100                                 uint32_t db_id, uint8_t db_flags)
101 {
102         struct tevent_req *req, *subreq;
103         struct ctdb_set_db_flags_state *state;
104         struct ctdb_req_control request;
105
106         req = tevent_req_create(mem_ctx, &state,
107                                 struct ctdb_set_db_flags_state);
108         if (req == NULL) {
109                 return NULL;
110         }
111
112         if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
113                 tevent_req_done(req);
114                 return tevent_req_post(req, ev);
115         }
116
117         state->ev = ev;
118         state->client = client;
119         state->timeout = timeout;
120         state->db_id = db_id;
121         state->db_flags = db_flags;
122
123         ctdb_req_control_get_nodemap(&request);
124         subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
125                                           &request);
126         if (tevent_req_nomem(subreq, req)) {
127                 return tevent_req_post(req, ev);
128         }
129         tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
130
131         return req;
132 }
133
134 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
135 {
136         struct tevent_req *req = tevent_req_callback_data(
137                 subreq, struct tevent_req);
138         struct ctdb_set_db_flags_state *state = tevent_req_data(
139                 req, struct ctdb_set_db_flags_state);
140         struct ctdb_req_control request;
141         struct ctdb_reply_control *reply;
142         struct ctdb_node_map *nodemap;
143         int ret;
144         bool status;
145
146         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
147         TALLOC_FREE(subreq);
148         if (! status) {
149                 DEBUG(DEBUG_ERR,
150                       ("set_db_flags: 0x%08x GET_NODEMAP failed, ret=%d\n",
151                        state->db_id, ret));
152                 tevent_req_error(req, ret);
153                 return;
154         }
155
156         ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
157         talloc_free(reply);
158         if (ret != 0) {
159                 DEBUG(DEBUG_ERR,
160                       ("set_db_flags: 0x%08x GET_NODEMAP parse failed, ret=%d\n",
161                       state->db_id, ret));
162                 tevent_req_error(req, ret);
163                 return;
164         }
165
166         state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
167                                                state, &state->pnn_list);
168         talloc_free(nodemap);
169         if (state->count <= 0) {
170                 DEBUG(DEBUG_ERR,
171                       ("set_db_flags: 0x%08x no connected nodes, count=%d\n",
172                        state->db_id, state->count));
173                 tevent_req_error(req, ENOMEM);
174                 return;
175         }
176
177         if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
178                 ctdb_req_control_set_db_readonly(&request, state->db_id);
179                 subreq = ctdb_client_control_multi_send(
180                                         state, state->ev, state->client,
181                                         state->pnn_list, state->count,
182                                         state->timeout, &request);
183                 if (tevent_req_nomem(subreq, req)) {
184                         return;
185                 }
186                 tevent_req_set_callback(subreq,
187                                         ctdb_set_db_flags_readonly_done, req);
188         } else {
189                 state->readonly_done = true;
190         }
191
192         if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
193                 ctdb_req_control_set_db_sticky(&request, state->db_id);
194                 subreq = ctdb_client_control_multi_send(
195                                         state, state->ev, state->client,
196                                         state->pnn_list, state->count,
197                                         state->timeout, &request);
198                 if (tevent_req_nomem(subreq, req)) {
199                         return;
200                 }
201                 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
202                                         req);
203         } else {
204                 state->sticky_done = true;
205         }
206 }
207
208 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
209 {
210         struct tevent_req *req = tevent_req_callback_data(
211                 subreq, struct tevent_req);
212         struct ctdb_set_db_flags_state *state = tevent_req_data(
213                 req, struct ctdb_set_db_flags_state);
214         int ret;
215         bool status;
216
217         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
218                                                 NULL);
219         TALLOC_FREE(subreq);
220         if (! status) {
221                 DEBUG(DEBUG_ERR,
222                       ("set_db_flags: 0x%08x SET_DB_READONLY failed, ret=%d\n",
223                        state->db_id, ret));
224                 tevent_req_error(req, ret);
225                 return;
226         }
227
228         state->readonly_done = true;
229
230         if (state->readonly_done && state->sticky_done) {
231                 tevent_req_done(req);
232         }
233 }
234
235 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
236 {
237         struct tevent_req *req = tevent_req_callback_data(
238                 subreq, struct tevent_req);
239         struct ctdb_set_db_flags_state *state = tevent_req_data(
240                 req, struct ctdb_set_db_flags_state);
241         int ret;
242         bool status;
243
244         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
245                                                 NULL);
246         TALLOC_FREE(subreq);
247         if (! status) {
248                 DEBUG(DEBUG_ERR,
249                       ("set_db_flags: 0x%08x SET_DB_STICKY failed, ret=%d\n",
250                        state->db_id, ret));
251                 tevent_req_error(req, ret);
252                 return;
253         }
254
255         state->sticky_done = true;
256
257         if (state->readonly_done && state->sticky_done) {
258                 tevent_req_done(req);
259         }
260 }
261
262 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
263 {
264         int err;
265
266         if (tevent_req_is_unix_error(req, &err)) {
267                 if (perr != NULL) {
268                         *perr = err;
269                 }
270                 return false;
271         }
272         return true;
273 }
274
275 struct ctdb_attach_state {
276         struct tevent_context *ev;
277         struct ctdb_client_context *client;
278         struct timeval timeout;
279         uint32_t destnode;
280         uint8_t db_flags;
281         struct ctdb_db_context *db;
282 };
283
284 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
285 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
286 static void ctdb_attach_health_done(struct tevent_req *subreq);
287 static void ctdb_attach_flags_done(struct tevent_req *subreq);
288 static void ctdb_attach_open_flags_done(struct tevent_req *subreq);
289
290 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
291                                     struct tevent_context *ev,
292                                     struct ctdb_client_context *client,
293                                     struct timeval timeout,
294                                     const char *db_name, uint8_t db_flags)
295 {
296         struct tevent_req *req, *subreq;
297         struct ctdb_attach_state *state;
298         struct ctdb_req_control request;
299
300         req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
301         if (req == NULL) {
302                 return NULL;
303         }
304
305         state->db = client_db_handle(client, db_name);
306         if (state->db != NULL) {
307                 tevent_req_done(req);
308                 return tevent_req_post(req, ev);
309         }
310
311         state->ev = ev;
312         state->client = client;
313         state->timeout = timeout;
314         state->destnode = ctdb_client_pnn(client);
315         state->db_flags = db_flags;
316
317         state->db = talloc_zero(client, struct ctdb_db_context);
318         if (tevent_req_nomem(state->db, req)) {
319                 return tevent_req_post(req, ev);
320         }
321
322         state->db->db_name = talloc_strdup(state->db, db_name);
323         if (tevent_req_nomem(state->db, req)) {
324                 return tevent_req_post(req, ev);
325         }
326
327         state->db->db_flags = db_flags;
328
329         if (ctdb_db_persistent(state->db)) {
330                 ctdb_req_control_db_attach_persistent(&request,
331                                                       state->db->db_name);
332         } else if (ctdb_db_replicated(state->db)) {
333                 ctdb_req_control_db_attach_replicated(&request,
334                                                       state->db->db_name);
335         } else {
336                 ctdb_req_control_db_attach(&request, state->db->db_name);
337         }
338
339         subreq = ctdb_client_control_send(state, state->ev, state->client,
340                                           state->destnode, state->timeout,
341                                           &request);
342         if (tevent_req_nomem(subreq, req)) {
343                 return tevent_req_post(req, ev);
344         }
345         tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
346
347         return req;
348 }
349
350 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
351 {
352         struct tevent_req *req = tevent_req_callback_data(
353                 subreq, struct tevent_req);
354         struct ctdb_attach_state *state = tevent_req_data(
355                 req, struct ctdb_attach_state);
356         struct ctdb_req_control request;
357         struct ctdb_reply_control *reply;
358         bool status;
359         int ret;
360
361         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
362         TALLOC_FREE(subreq);
363         if (! status) {
364                 DEBUG(DEBUG_ERR, ("attach: %s %s failed, ret=%d\n",
365                                   state->db->db_name,
366                                   (ctdb_db_persistent(state->db)
367                                         ? "DB_ATTACH_PERSISTENT"
368                                         : (ctdb_db_replicated(state->db)
369                                                 ? "DB_ATTACH_REPLICATED"
370                                                 : "DB_ATTACH")),
371                                   ret));
372                 tevent_req_error(req, ret);
373                 return;
374         }
375
376         if (ctdb_db_persistent(state->db)) {
377                 ret = ctdb_reply_control_db_attach_persistent(
378                                 reply, &state->db->db_id);
379         } else if (ctdb_db_replicated(state->db)) {
380                 ret = ctdb_reply_control_db_attach_replicated(
381                                 reply, &state->db->db_id);
382         } else {
383                 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
384         }
385         talloc_free(reply);
386         if (ret != 0) {
387                 DEBUG(DEBUG_ERR, ("attach: %s failed to get db_id, ret=%d\n",
388                                   state->db->db_name, ret));
389                 tevent_req_error(req, ret);
390                 return;
391         }
392
393         ctdb_req_control_getdbpath(&request, state->db->db_id);
394         subreq = ctdb_client_control_send(state, state->ev, state->client,
395                                           state->destnode, state->timeout,
396                                           &request);
397         if (tevent_req_nomem(subreq, req)) {
398                 return;
399         }
400         tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
401 }
402
403 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
404 {
405         struct tevent_req *req = tevent_req_callback_data(
406                 subreq, struct tevent_req);
407         struct ctdb_attach_state *state = tevent_req_data(
408                 req, struct ctdb_attach_state);
409         struct ctdb_reply_control *reply;
410         struct ctdb_req_control request;
411         bool status;
412         int ret;
413
414         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
415         TALLOC_FREE(subreq);
416         if (! status) {
417                 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH failed, ret=%d\n",
418                                   state->db->db_name, ret));
419                 tevent_req_error(req, ret);
420                 return;
421         }
422
423         ret = ctdb_reply_control_getdbpath(reply, state->db,
424                                            &state->db->db_path);
425         talloc_free(reply);
426         if (ret != 0) {
427                 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH parse failed, ret=%d\n",
428                                   state->db->db_name, ret));
429                 tevent_req_error(req, ret);
430                 return;
431         }
432
433         ctdb_req_control_db_get_health(&request, state->db->db_id);
434         subreq = ctdb_client_control_send(state, state->ev, state->client,
435                                           state->destnode, state->timeout,
436                                           &request);
437         if (tevent_req_nomem(subreq, req)) {
438                 return;
439         }
440         tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
441 }
442
443 static void ctdb_attach_health_done(struct tevent_req *subreq)
444 {
445         struct tevent_req *req = tevent_req_callback_data(
446                 subreq, struct tevent_req);
447         struct ctdb_attach_state *state = tevent_req_data(
448                 req, struct ctdb_attach_state);
449         struct ctdb_reply_control *reply;
450         const char *reason;
451         bool status;
452         int ret;
453
454         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
455         TALLOC_FREE(subreq);
456         if (! status) {
457                 DEBUG(DEBUG_ERR, ("attach: %s DB_GET_HEALTH failed, ret=%d\n",
458                                   state->db->db_name, ret));
459                 tevent_req_error(req, ret);
460                 return;
461         }
462
463         ret = ctdb_reply_control_db_get_health(reply, state, &reason);
464         if (ret != 0) {
465                 DEBUG(DEBUG_ERR,
466                       ("attach: %s DB_GET_HEALTH parse failed, ret=%d\n",
467                        state->db->db_name, ret));
468                 tevent_req_error(req, ret);
469                 return;
470         }
471
472         if (reason != NULL) {
473                 /* Database unhealthy, avoid attach */
474                 DEBUG(DEBUG_ERR, ("attach: %s database unhealthy (%s)\n",
475                                   state->db->db_name, reason));
476                 tevent_req_error(req, EIO);
477                 return;
478         }
479
480         subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
481                                         state->destnode, state->timeout,
482                                         state->db->db_id, state->db_flags);
483         if (tevent_req_nomem(subreq, req)) {
484                 return;
485         }
486         tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
487 }
488
489 static void ctdb_attach_flags_done(struct tevent_req *subreq)
490 {
491         struct tevent_req *req = tevent_req_callback_data(
492                 subreq, struct tevent_req);
493         struct ctdb_attach_state *state = tevent_req_data(
494                 req, struct ctdb_attach_state);
495         struct ctdb_req_control request;
496         bool status;
497         int ret;
498
499         status = ctdb_set_db_flags_recv(subreq, &ret);
500         TALLOC_FREE(subreq);
501         if (! status) {
502                 DEBUG(DEBUG_ERR, ("attach: %s set db flags 0x%08x failed\n",
503                                   state->db->db_name, state->db_flags));
504                 tevent_req_error(req, ret);
505                 return;
506         }
507
508         ctdb_req_control_db_open_flags(&request, state->db->db_id);
509         subreq = ctdb_client_control_send(state, state->ev, state->client,
510                                           state->destnode, state->timeout,
511                                           &request);
512         if (tevent_req_nomem(subreq, req)) {
513                 return;
514         }
515         tevent_req_set_callback(subreq, ctdb_attach_open_flags_done, req);
516 }
517
518 static void ctdb_attach_open_flags_done(struct tevent_req *subreq)
519 {
520         struct tevent_req *req = tevent_req_callback_data(
521                 subreq, struct tevent_req);
522         struct ctdb_attach_state *state = tevent_req_data(
523                 req, struct ctdb_attach_state);
524         struct ctdb_reply_control *reply;
525         bool status;
526         int ret, tdb_flags;
527
528         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
529         TALLOC_FREE(subreq);
530         if (! status) {
531                 DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS failed, ret=%d\n",
532                                   state->db->db_name, ret));
533                 tevent_req_error(req, ret);
534                 return;
535         }
536
537         ret = ctdb_reply_control_db_open_flags(reply, &tdb_flags);
538         talloc_free(reply);
539         if (ret != 0) {
540                 DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS parse failed,"
541                                   " ret=%d\n", state->db->db_name, ret));
542                 tevent_req_error(req, ret);
543                 return;
544         }
545
546         state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
547                                         tdb_flags, O_RDWR, 0);
548         if (tevent_req_nomem(state->db->ltdb, req)) {
549                 DEBUG(DEBUG_ERR, ("attach: %s tdb_wrap_open failed\n",
550                                   state->db->db_name));
551                 return;
552         }
553         DLIST_ADD(state->client->db, state->db);
554
555         tevent_req_done(req);
556 }
557
558 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
559                       struct ctdb_db_context **out)
560 {
561         struct ctdb_attach_state *state = tevent_req_data(
562                 req, struct ctdb_attach_state);
563         int err;
564
565         if (tevent_req_is_unix_error(req, &err)) {
566                 if (perr != NULL) {
567                         *perr = err;
568                 }
569                 return false;
570         }
571
572         if (out != NULL) {
573                 *out = state->db;
574         }
575         return true;
576 }
577
578 int ctdb_attach(struct tevent_context *ev,
579                 struct ctdb_client_context *client,
580                 struct timeval timeout,
581                 const char *db_name, uint8_t db_flags,
582                 struct ctdb_db_context **out)
583 {
584         TALLOC_CTX *mem_ctx;
585         struct tevent_req *req;
586         bool status;
587         int ret;
588
589         mem_ctx = talloc_new(client);
590         if (mem_ctx == NULL) {
591                 return ENOMEM;
592         }
593
594         req = ctdb_attach_send(mem_ctx, ev, client, timeout,
595                                db_name, db_flags);
596         if (req == NULL) {
597                 talloc_free(mem_ctx);
598                 return ENOMEM;
599         }
600
601         tevent_req_poll(req, ev);
602
603         status = ctdb_attach_recv(req, &ret, out);
604         if (! status) {
605                 talloc_free(mem_ctx);
606                 return ret;
607         }
608
609         /*
610         ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
611         ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
612         ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
613         */
614
615         talloc_free(mem_ctx);
616         return 0;
617 }
618
619 struct ctdb_detach_state {
620         struct ctdb_client_context *client;
621         struct tevent_context *ev;
622         struct timeval timeout;
623         uint32_t db_id;
624         const char *db_name;
625 };
626
627 static void ctdb_detach_dbname_done(struct tevent_req *subreq);
628 static void ctdb_detach_done(struct tevent_req *subreq);
629
630 struct tevent_req *ctdb_detach_send(TALLOC_CTX *mem_ctx,
631                                     struct tevent_context *ev,
632                                     struct ctdb_client_context *client,
633                                     struct timeval timeout, uint32_t db_id)
634 {
635         struct tevent_req *req, *subreq;
636         struct ctdb_detach_state *state;
637         struct ctdb_req_control request;
638
639         req = tevent_req_create(mem_ctx, &state, struct ctdb_detach_state);
640         if (req == NULL) {
641                 return NULL;
642         }
643
644         state->client = client;
645         state->ev = ev;
646         state->timeout = timeout;
647         state->db_id = db_id;
648
649         ctdb_req_control_get_dbname(&request, db_id);
650         subreq = ctdb_client_control_send(state, ev, client,
651                                           ctdb_client_pnn(client), timeout,
652                                           &request);
653         if (tevent_req_nomem(subreq, req)) {
654                 return tevent_req_post(req, ev);
655         }
656         tevent_req_set_callback(subreq, ctdb_detach_dbname_done, req);
657
658         return req;
659 }
660
661 static void ctdb_detach_dbname_done(struct tevent_req *subreq)
662 {
663         struct tevent_req *req = tevent_req_callback_data(
664                 subreq, struct tevent_req);
665         struct ctdb_detach_state *state = tevent_req_data(
666                 req, struct ctdb_detach_state);
667         struct ctdb_reply_control *reply;
668         struct ctdb_req_control request;
669         int ret;
670         bool status;
671
672         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
673         TALLOC_FREE(subreq);
674         if (! status) {
675                 DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
676                                   state->db_id, ret));
677                 tevent_req_error(req, ret);
678                 return;
679         }
680
681         ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
682         if (ret != 0) {
683                 DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
684                                   state->db_id, ret));
685                 tevent_req_error(req, ret);
686                 return;
687         }
688
689         ctdb_req_control_db_detach(&request, state->db_id);
690         subreq = ctdb_client_control_send(state, state->ev, state->client,
691                                           ctdb_client_pnn(state->client),
692                                           state->timeout, &request);
693         if (tevent_req_nomem(subreq, req)) {
694                 return;
695         }
696         tevent_req_set_callback(subreq, ctdb_detach_done, req);
697
698 }
699
700 static void ctdb_detach_done(struct tevent_req *subreq)
701 {
702         struct tevent_req *req = tevent_req_callback_data(
703                 subreq, struct tevent_req);
704         struct ctdb_detach_state *state = tevent_req_data(
705                 req, struct ctdb_detach_state);
706         struct ctdb_reply_control *reply;
707         struct ctdb_db_context *db;
708         int ret;
709         bool status;
710
711         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
712         TALLOC_FREE(subreq);
713         if (! status) {
714                 DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
715                                   state->db_name, ret));
716                 tevent_req_error(req, ret);
717                 return;
718         }
719
720         ret = ctdb_reply_control_db_detach(reply);
721         if (ret != 0) {
722                 DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
723                                   state->db_name, ret));
724                 tevent_req_error(req, ret);
725                 return;
726         }
727
728         db = client_db_handle(state->client, state->db_name);
729         if (db != NULL) {
730                 DLIST_REMOVE(state->client->db, db);
731                 TALLOC_FREE(db);
732         }
733
734         tevent_req_done(req);
735 }
736
737 bool ctdb_detach_recv(struct tevent_req *req, int *perr)
738 {
739         int ret;
740
741         if (tevent_req_is_unix_error(req, &ret)) {
742                 if (perr != NULL) {
743                         *perr = ret;
744                 }
745                 return false;
746         }
747
748         return true;
749 }
750
751 int ctdb_detach(struct tevent_context *ev,
752                 struct ctdb_client_context *client,
753                 struct timeval timeout, uint32_t db_id)
754 {
755         TALLOC_CTX *mem_ctx;
756         struct tevent_req *req;
757         int ret;
758         bool status;
759
760         mem_ctx = talloc_new(client);
761         if (mem_ctx == NULL) {
762                 return ENOMEM;
763         }
764
765         req = ctdb_detach_send(mem_ctx, ev, client, timeout, db_id);
766         if (req == NULL) {
767                 talloc_free(mem_ctx);
768                 return ENOMEM;
769         }
770
771         tevent_req_poll(req, ev);
772
773         status = ctdb_detach_recv(req, &ret);
774         if (! status) {
775                 talloc_free(mem_ctx);
776                 return ret;
777         }
778
779         talloc_free(mem_ctx);
780         return 0;
781 }
782
783 uint32_t ctdb_db_id(struct ctdb_db_context *db)
784 {
785         return db->db_id;
786 }
787
788 struct ctdb_db_traverse_local_state {
789         ctdb_rec_parser_func_t parser;
790         void *private_data;
791         bool extract_header;
792         int error;
793 };
794
795 static int ctdb_db_traverse_local_handler(struct tdb_context *tdb,
796                                           TDB_DATA key, TDB_DATA data,
797                                           void *private_data)
798 {
799         struct ctdb_db_traverse_local_state *state =
800                 (struct ctdb_db_traverse_local_state *)private_data;
801         int ret;
802
803         if (state->extract_header) {
804                 struct ctdb_ltdb_header header;
805
806                 ret = ctdb_ltdb_header_extract(&data, &header);
807                 if (ret != 0) {
808                         state->error = ret;
809                         return 1;
810                 }
811
812                 ret = state->parser(0, &header, key, data, state->private_data);
813         } else {
814                 ret = state->parser(0, NULL, key, data, state->private_data);
815         }
816
817         if (ret != 0) {
818                 state->error = ret;
819                 return 1;
820         }
821
822         return 0;
823 }
824
825 int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly,
826                            bool extract_header,
827                            ctdb_rec_parser_func_t parser, void *private_data)
828 {
829         struct ctdb_db_traverse_local_state state;
830         int ret;
831
832         state.parser = parser;
833         state.private_data = private_data;
834         state.extract_header = extract_header;
835         state.error = 0;
836
837         if (readonly) {
838                 ret = tdb_traverse_read(db->ltdb->tdb,
839                                         ctdb_db_traverse_local_handler,
840                                         &state);
841         } else {
842                 ret = tdb_traverse(db->ltdb->tdb,
843                                    ctdb_db_traverse_local_handler, &state);
844         }
845
846         if (ret == -1) {
847                 return EIO;
848         }
849
850         return state.error;
851 }
852
853 struct ctdb_db_traverse_state {
854         struct tevent_context *ev;
855         struct ctdb_client_context *client;
856         struct ctdb_db_context *db;
857         uint32_t destnode;
858         uint64_t srvid;
859         struct timeval timeout;
860         ctdb_rec_parser_func_t parser;
861         void *private_data;
862         int result;
863 };
864
865 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq);
866 static void ctdb_db_traverse_started(struct tevent_req *subreq);
867 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
868                                      void *private_data);
869 static void ctdb_db_traverse_remove_handler(struct tevent_req *req);
870 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq);
871
872 struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx,
873                                          struct tevent_context *ev,
874                                          struct ctdb_client_context *client,
875                                          struct ctdb_db_context *db,
876                                          uint32_t destnode,
877                                          struct timeval timeout,
878                                          ctdb_rec_parser_func_t parser,
879                                          void *private_data)
880 {
881         struct tevent_req *req, *subreq;
882         struct ctdb_db_traverse_state *state;
883
884         req = tevent_req_create(mem_ctx, &state,
885                                 struct ctdb_db_traverse_state);
886         if (req == NULL) {
887                 return NULL;
888         }
889
890         state->ev = ev;
891         state->client = client;
892         state->db = db;
893         state->destnode = destnode;
894         state->srvid = CTDB_SRVID_CLIENT_RANGE | getpid();
895         state->timeout = timeout;
896         state->parser = parser;
897         state->private_data = private_data;
898
899         subreq = ctdb_client_set_message_handler_send(state, ev, client,
900                                                       state->srvid,
901                                                       ctdb_db_traverse_handler,
902                                                       req);
903         if (tevent_req_nomem(subreq, req)) {
904                 return tevent_req_post(req, ev);
905         }
906         tevent_req_set_callback(subreq, ctdb_db_traverse_handler_set, req);
907
908         return req;
909 }
910
911 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq)
912 {
913         struct tevent_req *req = tevent_req_callback_data(
914                 subreq, struct tevent_req);
915         struct ctdb_db_traverse_state *state = tevent_req_data(
916                 req, struct ctdb_db_traverse_state);
917         struct ctdb_traverse_start_ext traverse;
918         struct ctdb_req_control request;
919         int ret = 0;
920         bool status;
921
922         status = ctdb_client_set_message_handler_recv(subreq, &ret);
923         TALLOC_FREE(subreq);
924         if (! status) {
925                 tevent_req_error(req, ret);
926                 return;
927         }
928
929         traverse = (struct ctdb_traverse_start_ext) {
930                 .db_id = ctdb_db_id(state->db),
931                 .reqid = 0,
932                 .srvid = state->srvid,
933                 .withemptyrecords = false,
934         };
935
936         ctdb_req_control_traverse_start_ext(&request, &traverse);
937         subreq = ctdb_client_control_send(state, state->ev, state->client,
938                                           state->destnode, state->timeout,
939                                           &request);
940         if (subreq == NULL) {
941                 state->result = ENOMEM;
942                 ctdb_db_traverse_remove_handler(req);
943                 return;
944         }
945         tevent_req_set_callback(subreq, ctdb_db_traverse_started, req);
946 }
947
948 static void ctdb_db_traverse_started(struct tevent_req *subreq)
949 {
950         struct tevent_req *req = tevent_req_callback_data(
951                 subreq, struct tevent_req);
952         struct ctdb_db_traverse_state *state = tevent_req_data(
953                 req, struct ctdb_db_traverse_state);
954         struct ctdb_reply_control *reply;
955         int ret = 0;
956         bool status;
957
958         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
959         TALLOC_FREE(subreq);
960         if (! status) {
961                 DEBUG(DEBUG_ERR, ("traverse: control failed, ret=%d\n", ret));
962                 state->result = ret;
963                 ctdb_db_traverse_remove_handler(req);
964                 return;
965         }
966
967         ret = ctdb_reply_control_traverse_start_ext(reply);
968         talloc_free(reply);
969         if (ret != 0) {
970                 DEBUG(DEBUG_ERR, ("traverse: control reply failed, ret=%d\n",
971                                   ret));
972                 state->result = ret;
973                 ctdb_db_traverse_remove_handler(req);
974                 return;
975         }
976 }
977
978 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
979                                      void *private_data)
980 {
981         struct tevent_req *req = talloc_get_type_abort(
982                 private_data, struct tevent_req);
983         struct ctdb_db_traverse_state *state = tevent_req_data(
984                 req, struct ctdb_db_traverse_state);
985         struct ctdb_rec_data *rec;
986         struct ctdb_ltdb_header header;
987         size_t np;
988         int ret;
989
990         ret = ctdb_rec_data_pull(data.dptr, data.dsize, state, &rec, &np);
991         if (ret != 0) {
992                 return;
993         }
994
995         if (rec->key.dsize == 0 && rec->data.dsize == 0) {
996                 talloc_free(rec);
997                 ctdb_db_traverse_remove_handler(req);
998                 return;
999         }
1000
1001         ret = ctdb_ltdb_header_extract(&rec->data, &header);
1002         if (ret != 0) {
1003                 talloc_free(rec);
1004                 return;
1005         }
1006
1007         if (rec->data.dsize == 0) {
1008                 talloc_free(rec);
1009                 return;
1010         }
1011
1012         ret = state->parser(rec->reqid, &header, rec->key, rec->data,
1013                             state->private_data);
1014         talloc_free(rec);
1015         if (ret != 0) {
1016                 state->result = ret;
1017                 ctdb_db_traverse_remove_handler(req);
1018         }
1019 }
1020
1021 static void ctdb_db_traverse_remove_handler(struct tevent_req *req)
1022 {
1023         struct ctdb_db_traverse_state *state = tevent_req_data(
1024                 req, struct ctdb_db_traverse_state);
1025         struct tevent_req *subreq;
1026
1027         subreq = ctdb_client_remove_message_handler_send(state, state->ev,
1028                                                          state->client,
1029                                                          state->srvid, req);
1030         if (tevent_req_nomem(subreq, req)) {
1031                 return;
1032         }
1033         tevent_req_set_callback(subreq, ctdb_db_traverse_handler_removed, req);
1034 }
1035
1036 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq)
1037 {
1038         struct tevent_req *req = tevent_req_callback_data(
1039                 subreq, struct tevent_req);
1040         struct ctdb_db_traverse_state *state = tevent_req_data(
1041                 req, struct ctdb_db_traverse_state);
1042         int ret;
1043         bool status;
1044
1045         status = ctdb_client_remove_message_handler_recv(subreq, &ret);
1046         TALLOC_FREE(subreq);
1047         if (! status) {
1048                 tevent_req_error(req, ret);
1049                 return;
1050         }
1051
1052         if (state->result != 0) {
1053                 tevent_req_error(req, state->result);
1054                 return;
1055         }
1056
1057         tevent_req_done(req);
1058 }
1059
1060 bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr)
1061 {
1062         int ret;
1063
1064         if (tevent_req_is_unix_error(req, &ret)) {
1065                 if (perr != NULL) {
1066                         *perr = ret;
1067                 }
1068                 return false;
1069         }
1070
1071         return true;
1072 }
1073
1074 int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1075                      struct ctdb_client_context *client,
1076                      struct ctdb_db_context *db,
1077                      uint32_t destnode, struct timeval timeout,
1078                      ctdb_rec_parser_func_t parser, void *private_data)
1079 {
1080         struct tevent_req *req;
1081         int ret = 0;
1082         bool status;
1083
1084         req = ctdb_db_traverse_send(mem_ctx, ev, client, db, destnode,
1085                                     timeout, parser, private_data);
1086         if (req == NULL) {
1087                 return ENOMEM;
1088         }
1089
1090         tevent_req_poll(req, ev);
1091
1092         status = ctdb_db_traverse_recv(req, &ret);
1093         if (! status) {
1094                 return ret;
1095         }
1096
1097         return 0;
1098 }
1099
1100 int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
1101                     struct ctdb_ltdb_header *header,
1102                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
1103 {
1104         TDB_DATA rec;
1105         size_t np;
1106         int ret;
1107
1108         rec = tdb_fetch(db->ltdb->tdb, key);
1109         if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
1110                 /* No record present */
1111                 if (rec.dptr != NULL) {
1112                         free(rec.dptr);
1113                 }
1114
1115                 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
1116                         return EIO;
1117                 }
1118
1119                 *header = (struct ctdb_ltdb_header) {
1120                         .dmaster = CTDB_UNKNOWN_PNN,
1121                 };
1122
1123                 if (data != NULL) {
1124                         *data = tdb_null;
1125                 }
1126                 return 0;
1127         }
1128
1129         ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np);
1130         if (ret != 0) {
1131                 return ret;
1132         }
1133
1134         ret = 0;
1135         if (data != NULL) {
1136                 data->dsize = rec.dsize - np;
1137                 data->dptr = talloc_memdup(mem_ctx, rec.dptr + np,
1138                                            data->dsize);
1139                 if (data->dptr == NULL) {
1140                         ret = ENOMEM;
1141                 }
1142         }
1143
1144         free(rec.dptr);
1145         return ret;
1146 }
1147
1148 /*
1149  * Fetch a record from volatile database
1150  *
1151  * Steps:
1152  *  1. Get a lock on the hash chain
1153  *  2. If the record does not exist, migrate the record
1154  *  3. If readonly=true and delegations do not exist, migrate the record.
1155  *  4. If readonly=false and delegations exist, migrate the record.
1156  *  5. If the local node is not dmaster, migrate the record.
1157  *  6. Return record
1158  */
1159
1160 struct ctdb_fetch_lock_state {
1161         struct tevent_context *ev;
1162         struct ctdb_client_context *client;
1163         struct ctdb_record_handle *h;
1164         bool readonly;
1165         uint32_t pnn;
1166 };
1167
1168 static int ctdb_fetch_lock_check(struct tevent_req *req);
1169 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
1170 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
1171
1172 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
1173                                         struct tevent_context *ev,
1174                                         struct ctdb_client_context *client,
1175                                         struct ctdb_db_context *db,
1176                                         TDB_DATA key, bool readonly)
1177 {
1178         struct ctdb_fetch_lock_state *state;
1179         struct tevent_req *req;
1180         int ret;
1181
1182         req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
1183         if (req == NULL) {
1184                 return NULL;
1185         }
1186
1187         state->ev = ev;
1188         state->client = client;
1189
1190         state->h = talloc_zero(db, struct ctdb_record_handle);
1191         if (tevent_req_nomem(state->h, req)) {
1192                 return tevent_req_post(req, ev);
1193         }
1194         state->h->client = client;
1195         state->h->db = db;
1196         state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
1197         if (tevent_req_nomem(state->h->key.dptr, req)) {
1198                 return tevent_req_post(req, ev);
1199         }
1200         state->h->key.dsize = key.dsize;
1201         state->h->readonly = false;
1202
1203         state->readonly = readonly;
1204         state->pnn = ctdb_client_pnn(client);
1205
1206         /* Check that database is not persistent */
1207         if (! ctdb_db_volatile(db)) {
1208                 DEBUG(DEBUG_ERR, ("fetch_lock: %s database not volatile\n",
1209                                   db->db_name));
1210                 tevent_req_error(req, EINVAL);
1211                 return tevent_req_post(req, ev);
1212         }
1213
1214         ret = ctdb_fetch_lock_check(req);
1215         if (ret == 0) {
1216                 tevent_req_done(req);
1217                 return tevent_req_post(req, ev);
1218         }
1219         if (ret != EAGAIN) {
1220                 tevent_req_error(req, ret);
1221                 return tevent_req_post(req, ev);
1222         }
1223         return req;
1224 }
1225
1226 static int ctdb_fetch_lock_check(struct tevent_req *req)
1227 {
1228         struct ctdb_fetch_lock_state *state = tevent_req_data(
1229                 req, struct ctdb_fetch_lock_state);
1230         struct ctdb_record_handle *h = state->h;
1231         struct ctdb_ltdb_header header;
1232         TDB_DATA data = tdb_null;
1233         size_t np;
1234         int ret, err = 0;
1235         bool do_migrate = false;
1236
1237         ret = tdb_chainlock(h->db->ltdb->tdb, h->key);
1238         if (ret != 0) {
1239                 DEBUG(DEBUG_ERR,
1240                       ("fetch_lock: %s tdb_chainlock failed, %s\n",
1241                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1242                 err = EIO;
1243                 goto failed;
1244         }
1245
1246         data = tdb_fetch(h->db->ltdb->tdb, h->key);
1247         if (data.dptr == NULL) {
1248                 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
1249                         goto migrate;
1250                 } else {
1251                         err = EIO;
1252                         goto failed;
1253                 }
1254         }
1255
1256         /* Got the record */
1257         ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header, &np);
1258         if (ret != 0) {
1259                 err = ret;
1260                 goto failed;
1261         }
1262
1263         if (! state->readonly) {
1264                 /* Read/write access */
1265                 if (header.dmaster == state->pnn &&
1266                     header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
1267                         goto migrate;
1268                 }
1269
1270                 if (header.dmaster != state->pnn) {
1271                         goto migrate;
1272                 }
1273         } else {
1274                 /* Readonly access */
1275                 if (header.dmaster != state->pnn &&
1276                     ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1277                                        CTDB_REC_RO_HAVE_DELEGATIONS))) {
1278                         goto migrate;
1279                 }
1280         }
1281
1282         /* We are the dmaster or readonly delegation */
1283         h->header = header;
1284         h->data = data;
1285         if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1286                             CTDB_REC_RO_HAVE_DELEGATIONS)) {
1287                 h->readonly = true;
1288         }
1289         return 0;
1290
1291 migrate:
1292         do_migrate = true;
1293         err = EAGAIN;
1294
1295 failed:
1296         if (data.dptr != NULL) {
1297                 free(data.dptr);
1298         }
1299         ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
1300         if (ret != 0) {
1301                 DEBUG(DEBUG_ERR,
1302                       ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1303                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1304                 return EIO;
1305         }
1306
1307         if (do_migrate) {
1308                 ctdb_fetch_lock_migrate(req);
1309         }
1310         return err;
1311 }
1312
1313 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
1314 {
1315         struct ctdb_fetch_lock_state *state = tevent_req_data(
1316                 req, struct ctdb_fetch_lock_state);
1317         struct ctdb_req_call request;
1318         struct tevent_req *subreq;
1319
1320         ZERO_STRUCT(request);
1321         request.flags = CTDB_IMMEDIATE_MIGRATION;
1322         if (state->readonly) {
1323                 request.flags |= CTDB_WANT_READONLY;
1324         }
1325         request.db_id = state->h->db->db_id;
1326         request.callid = CTDB_NULL_FUNC;
1327         request.key = state->h->key;
1328         request.calldata = tdb_null;
1329
1330         subreq = ctdb_client_call_send(state, state->ev, state->client,
1331                                        &request);
1332         if (tevent_req_nomem(subreq, req)) {
1333                 return;
1334         }
1335
1336         tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
1337 }
1338
1339 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
1340 {
1341         struct tevent_req *req = tevent_req_callback_data(
1342                 subreq, struct tevent_req);
1343         struct ctdb_fetch_lock_state *state = tevent_req_data(
1344                 req, struct ctdb_fetch_lock_state);
1345         struct ctdb_reply_call *reply;
1346         int ret;
1347         bool status;
1348
1349         status = ctdb_client_call_recv(subreq, state, &reply, &ret);
1350         TALLOC_FREE(subreq);
1351         if (! status) {
1352                 DEBUG(DEBUG_ERR, ("fetch_lock: %s CALL failed, ret=%d\n",
1353                                   state->h->db->db_name, ret));
1354                 tevent_req_error(req, ret);
1355                 return;
1356         }
1357
1358         if (reply->status != 0) {
1359                 tevent_req_error(req, EIO);
1360                 return;
1361         }
1362         talloc_free(reply);
1363
1364         ret = ctdb_fetch_lock_check(req);
1365         if (ret != 0) {
1366                 if (ret != EAGAIN) {
1367                         tevent_req_error(req, ret);
1368                 }
1369                 return;
1370         }
1371
1372         tevent_req_done(req);
1373 }
1374
1375 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
1376 {
1377         int ret;
1378
1379         ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
1380         if (ret != 0) {
1381                 DEBUG(DEBUG_ERR,
1382                       ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1383                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1384         }
1385         free(h->data.dptr);
1386         return 0;
1387 }
1388
1389 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
1390                                                 struct ctdb_ltdb_header *header,
1391                                                 TALLOC_CTX *mem_ctx,
1392                                                 TDB_DATA *data, int *perr)
1393 {
1394         struct ctdb_fetch_lock_state *state = tevent_req_data(
1395                 req, struct ctdb_fetch_lock_state);
1396         struct ctdb_record_handle *h = state->h;
1397         int err;
1398
1399         if (tevent_req_is_unix_error(req, &err)) {
1400                 if (perr != NULL) {
1401                         TALLOC_FREE(state->h);
1402                         *perr = err;
1403                 }
1404                 return NULL;
1405         }
1406
1407         if (header != NULL) {
1408                 *header = h->header;
1409         }
1410         if (data != NULL) {
1411                 size_t offset;
1412
1413                 offset = ctdb_ltdb_header_len(&h->header);
1414
1415                 data->dsize = h->data.dsize - offset;
1416                 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
1417                                            data->dsize);
1418                 if (data->dptr == NULL) {
1419                         TALLOC_FREE(state->h);
1420                         if (perr != NULL) {
1421                                 *perr = ENOMEM;
1422                         }
1423                         return NULL;
1424                 }
1425         }
1426
1427         talloc_set_destructor(h, ctdb_record_handle_destructor);
1428         return h;
1429 }
1430
1431 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1432                     struct ctdb_client_context *client,
1433                     struct ctdb_db_context *db, TDB_DATA key, bool readonly,
1434                     struct ctdb_record_handle **out,
1435                     struct ctdb_ltdb_header *header, TDB_DATA *data)
1436 {
1437         struct tevent_req *req;
1438         struct ctdb_record_handle *h;
1439         int ret;
1440
1441         req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
1442         if (req == NULL) {
1443                 return ENOMEM;
1444         }
1445
1446         tevent_req_poll(req, ev);
1447
1448         h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
1449         if (h == NULL) {
1450                 return ret;
1451         }
1452
1453         *out = h;
1454         return 0;
1455 }
1456
1457 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
1458 {
1459         uint8_t header[sizeof(struct ctdb_ltdb_header)];
1460         TDB_DATA rec[2];
1461         size_t np;
1462         int ret;
1463
1464         /* Cannot modify the record if it was obtained as a readonly copy */
1465         if (h->readonly) {
1466                 return EINVAL;
1467         }
1468
1469         /* Check if the new data is same */
1470         if (h->data.dsize == data.dsize &&
1471             memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
1472                 /* No need to do anything */
1473                 return 0;
1474         }
1475
1476         ctdb_ltdb_header_push(&h->header, header, &np);
1477
1478         rec[0].dsize = np;
1479         rec[0].dptr = header;
1480
1481         rec[1].dsize = data.dsize;
1482         rec[1].dptr = data.dptr;
1483
1484         ret = tdb_storev(h->db->ltdb->tdb, h->key, rec, 2, TDB_REPLACE);
1485         if (ret != 0) {
1486                 DEBUG(DEBUG_ERR,
1487                       ("store_record: %s tdb_storev failed, %s\n",
1488                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1489                 return EIO;
1490         }
1491
1492         return 0;
1493 }
1494
1495 struct ctdb_delete_record_state {
1496         struct ctdb_record_handle *h;
1497 };
1498
1499 static void ctdb_delete_record_done(struct tevent_req *subreq);
1500
1501 struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx,
1502                                            struct tevent_context *ev,
1503                                            struct ctdb_record_handle *h)
1504 {
1505         struct tevent_req *req, *subreq;
1506         struct ctdb_delete_record_state *state;
1507         struct ctdb_key_data key;
1508         struct ctdb_req_control request;
1509         uint8_t header[sizeof(struct ctdb_ltdb_header)];
1510         TDB_DATA rec;
1511         size_t  np;
1512         int ret;
1513
1514         req = tevent_req_create(mem_ctx, &state,
1515                                 struct ctdb_delete_record_state);
1516         if (req == NULL) {
1517                 return NULL;
1518         }
1519
1520         state->h = h;
1521
1522         /* Cannot delete the record if it was obtained as a readonly copy */
1523         if (h->readonly) {
1524                 DEBUG(DEBUG_ERR, ("fetch_lock delete: %s readonly record\n",
1525                                   h->db->db_name));
1526                 tevent_req_error(req, EINVAL);
1527                 return tevent_req_post(req, ev);
1528         }
1529
1530         ctdb_ltdb_header_push(&h->header, header, &np);
1531
1532         rec.dsize = np;
1533         rec.dptr = header;
1534
1535         ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1536         if (ret != 0) {
1537                 DEBUG(DEBUG_ERR,
1538                       ("fetch_lock delete: %s tdb_sore failed, %s\n",
1539                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1540                 tevent_req_error(req, EIO);
1541                 return tevent_req_post(req, ev);
1542         }
1543
1544         key.db_id = h->db->db_id;
1545         key.header = h->header;
1546         key.key = h->key;
1547
1548         ctdb_req_control_schedule_for_deletion(&request, &key);
1549         subreq = ctdb_client_control_send(state, ev, h->client,
1550                                           ctdb_client_pnn(h->client),
1551                                           tevent_timeval_zero(),
1552                                           &request);
1553         if (tevent_req_nomem(subreq, req)) {
1554                 return tevent_req_post(req, ev);
1555         }
1556         tevent_req_set_callback(subreq, ctdb_delete_record_done, req);
1557
1558         return req;
1559 }
1560
1561 static void ctdb_delete_record_done(struct tevent_req *subreq)
1562 {
1563         struct tevent_req *req = tevent_req_callback_data(
1564                 subreq, struct tevent_req);
1565         struct ctdb_delete_record_state *state = tevent_req_data(
1566                 req, struct ctdb_delete_record_state);
1567         int ret;
1568         bool status;
1569
1570         status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
1571         TALLOC_FREE(subreq);
1572         if (! status) {
1573                 DEBUG(DEBUG_ERR,
1574                       ("delete_record: %s SCHDULE_FOR_DELETION failed, "
1575                        "ret=%d\n", state->h->db->db_name, ret));
1576                 tevent_req_error(req, ret);
1577                 return;
1578         }
1579
1580         tevent_req_done(req);
1581 }
1582
1583 bool ctdb_delete_record_recv(struct tevent_req *req, int *perr)
1584 {
1585         int err;
1586
1587         if (tevent_req_is_unix_error(req, &err)) {
1588                 if (perr != NULL) {
1589                         *perr = err;
1590                 }
1591                 return false;
1592         }
1593
1594         return true;
1595 }
1596
1597
1598 int ctdb_delete_record(struct ctdb_record_handle *h)
1599 {
1600         struct tevent_context *ev = h->ev;
1601         TALLOC_CTX *mem_ctx;
1602         struct tevent_req *req;
1603         int ret;
1604         bool status;
1605
1606         mem_ctx = talloc_new(NULL);
1607         if (mem_ctx == NULL) {
1608                 return ENOMEM;
1609         }
1610
1611         req = ctdb_delete_record_send(mem_ctx, ev, h);
1612         if (req == NULL) {
1613                 talloc_free(mem_ctx);
1614                 return ENOMEM;
1615         }
1616
1617         tevent_req_poll(req, ev);
1618
1619         status = ctdb_delete_record_recv(req, &ret);
1620         talloc_free(mem_ctx);
1621         if (! status) {
1622                 return ret;
1623         }
1624
1625         return 0;
1626 }
1627
1628 /*
1629  * Global lock functions
1630  */
1631
1632 struct ctdb_g_lock_lock_state {
1633         struct tevent_context *ev;
1634         struct ctdb_client_context *client;
1635         struct ctdb_db_context *db;
1636         TDB_DATA key;
1637         struct ctdb_server_id my_sid;
1638         enum ctdb_g_lock_type lock_type;
1639         struct ctdb_record_handle *h;
1640         /* state for verification of active locks */
1641         struct ctdb_g_lock_list *lock_list;
1642         unsigned int current;
1643 };
1644
1645 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1646 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1647 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1648 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1649 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1650
1651 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1652                                   enum ctdb_g_lock_type l2)
1653 {
1654         if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1655                 return false;
1656         }
1657         return true;
1658 }
1659
1660 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1661                                          struct tevent_context *ev,
1662                                          struct ctdb_client_context *client,
1663                                          struct ctdb_db_context *db,
1664                                          const char *keyname,
1665                                          struct ctdb_server_id *sid,
1666                                          bool readonly)
1667 {
1668         struct tevent_req *req, *subreq;
1669         struct ctdb_g_lock_lock_state *state;
1670
1671         req = tevent_req_create(mem_ctx, &state,
1672                                 struct ctdb_g_lock_lock_state);
1673         if (req == NULL) {
1674                 return NULL;
1675         }
1676
1677         state->ev = ev;
1678         state->client = client;
1679         state->db = db;
1680         state->key.dptr = discard_const(keyname);
1681         state->key.dsize = strlen(keyname) + 1;
1682         state->my_sid = *sid;
1683         state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1684
1685         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1686                                       false);
1687         if (tevent_req_nomem(subreq, req)) {
1688                 return tevent_req_post(req, ev);
1689         }
1690         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1691
1692         return req;
1693 }
1694
1695 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1696 {
1697         struct tevent_req *req = tevent_req_callback_data(
1698                 subreq, struct tevent_req);
1699         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1700                 req, struct ctdb_g_lock_lock_state);
1701         TDB_DATA data;
1702         size_t np;
1703         int ret = 0;
1704
1705         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1706         TALLOC_FREE(subreq);
1707         if (state->h == NULL) {
1708                 DEBUG(DEBUG_ERR, ("g_lock_lock: %s fetch lock failed\n",
1709                                   (char *)state->key.dptr));
1710                 tevent_req_error(req, ret);
1711                 return;
1712         }
1713
1714         if (state->lock_list != NULL) {
1715                 TALLOC_FREE(state->lock_list);
1716                 state->current = 0;
1717         }
1718
1719         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1720                                     &state->lock_list, &np);
1721         talloc_free(data.dptr);
1722         if (ret != 0) {
1723                 DEBUG(DEBUG_ERR, ("g_lock_lock: %s invalid lock data\n",
1724                                   (char *)state->key.dptr));
1725                 tevent_req_error(req, ret);
1726                 return;
1727         }
1728
1729         ctdb_g_lock_lock_process_locks(req);
1730 }
1731
1732 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1733 {
1734         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1735                 req, struct ctdb_g_lock_lock_state);
1736         struct tevent_req *subreq;
1737         struct ctdb_g_lock *lock;
1738         bool check_server = false;
1739         int ret;
1740
1741         while (state->current < state->lock_list->num) {
1742                 lock = &state->lock_list->lock[state->current];
1743
1744                 /* We should not ask for the same lock more than once */
1745                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1746                         DEBUG(DEBUG_ERR, ("g_lock_lock: %s deadlock\n",
1747                                           (char *)state->key.dptr));
1748                         tevent_req_error(req, EDEADLK);
1749                         return;
1750                 }
1751
1752                 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1753                         check_server = true;
1754                         break;
1755                 }
1756
1757                 state->current += 1;
1758         }
1759
1760         if (check_server) {
1761                 struct ctdb_req_control request;
1762
1763                 ctdb_req_control_process_exists(&request, lock->sid.pid);
1764                 subreq = ctdb_client_control_send(state, state->ev,
1765                                                   state->client,
1766                                                   lock->sid.vnn,
1767                                                   tevent_timeval_zero(),
1768                                                   &request);
1769                 if (tevent_req_nomem(subreq, req)) {
1770                         return;
1771                 }
1772                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1773                 return;
1774         }
1775
1776         /* There is no conflict, add ourself to the lock_list */
1777         state->lock_list->lock = talloc_realloc(state->lock_list,
1778                                                 state->lock_list->lock,
1779                                                 struct ctdb_g_lock,
1780                                                 state->lock_list->num + 1);
1781         if (state->lock_list->lock == NULL) {
1782                 tevent_req_error(req, ENOMEM);
1783                 return;
1784         }
1785
1786         lock = &state->lock_list->lock[state->lock_list->num];
1787         lock->type = state->lock_type;
1788         lock->sid = state->my_sid;
1789         state->lock_list->num += 1;
1790
1791         ret = ctdb_g_lock_lock_update(req);
1792         if (ret != 0) {
1793                 tevent_req_error(req, ret);
1794                 return;
1795         }
1796
1797         TALLOC_FREE(state->h);
1798         tevent_req_done(req);
1799 }
1800
1801 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1802 {
1803         struct tevent_req *req = tevent_req_callback_data(
1804                 subreq, struct tevent_req);
1805         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1806                 req, struct ctdb_g_lock_lock_state);
1807         struct ctdb_reply_control *reply;
1808         int ret, value;
1809         bool status;
1810
1811         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1812         TALLOC_FREE(subreq);
1813         if (! status) {
1814                 DEBUG(DEBUG_ERR,
1815                       ("g_lock_lock: %s PROCESS_EXISTS failed, ret=%d\n",
1816                        (char *)state->key.dptr, ret));
1817                 tevent_req_error(req, ret);
1818                 return;
1819         }
1820
1821         ret = ctdb_reply_control_process_exists(reply, &value);
1822         if (ret != 0) {
1823                 tevent_req_error(req, ret);
1824                 return;
1825         }
1826         talloc_free(reply);
1827
1828         if (value == 0) {
1829                 /* server process exists, need to retry */
1830                 TALLOC_FREE(state->h);
1831                 subreq = tevent_wakeup_send(state, state->ev,
1832                                             tevent_timeval_current_ofs(0,1000));
1833                 if (tevent_req_nomem(subreq, req)) {
1834                         return;
1835                 }
1836                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1837                 return;
1838         }
1839
1840         /* server process does not exist, remove conflicting entry */
1841         state->lock_list->lock[state->current] =
1842                 state->lock_list->lock[state->lock_list->num-1];
1843         state->lock_list->num -= 1;
1844
1845         ret = ctdb_g_lock_lock_update(req);
1846         if (ret != 0) {
1847                 tevent_req_error(req, ret);
1848                 return;
1849         }
1850
1851         ctdb_g_lock_lock_process_locks(req);
1852 }
1853
1854 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1855 {
1856         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1857                 req, struct ctdb_g_lock_lock_state);
1858         TDB_DATA data;
1859         size_t np;
1860         int ret;
1861
1862         data.dsize = ctdb_g_lock_list_len(state->lock_list);
1863         data.dptr = talloc_size(state, data.dsize);
1864         if (data.dptr == NULL) {
1865                 return ENOMEM;
1866         }
1867
1868         ctdb_g_lock_list_push(state->lock_list, data.dptr, &np);
1869         ret = ctdb_store_record(state->h, data);
1870         talloc_free(data.dptr);
1871         return ret;
1872 }
1873
1874 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1875 {
1876         struct tevent_req *req = tevent_req_callback_data(
1877                 subreq, struct tevent_req);
1878         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1879                 req, struct ctdb_g_lock_lock_state);
1880         bool success;
1881
1882         success = tevent_wakeup_recv(subreq);
1883         TALLOC_FREE(subreq);
1884         if (! success) {
1885                 tevent_req_error(req, ENOMEM);
1886                 return;
1887         }
1888
1889         subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1890                                       state->db, state->key, false);
1891         if (tevent_req_nomem(subreq, req)) {
1892                 return;
1893         }
1894         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1895 }
1896
1897 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1898 {
1899         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1900                 req, struct ctdb_g_lock_lock_state);
1901         int err;
1902
1903         TALLOC_FREE(state->h);
1904
1905         if (tevent_req_is_unix_error(req, &err)) {
1906                 if (perr != NULL) {
1907                         *perr = err;
1908                 }
1909                 return false;
1910         }
1911
1912         return true;
1913 }
1914
1915 struct ctdb_g_lock_unlock_state {
1916         struct tevent_context *ev;
1917         struct ctdb_client_context *client;
1918         struct ctdb_db_context *db;
1919         TDB_DATA key;
1920         struct ctdb_server_id my_sid;
1921         struct ctdb_record_handle *h;
1922         struct ctdb_g_lock_list *lock_list;
1923 };
1924
1925 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1926 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1927 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq);
1928
1929 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1930                                            struct tevent_context *ev,
1931                                            struct ctdb_client_context *client,
1932                                            struct ctdb_db_context *db,
1933                                            const char *keyname,
1934                                            struct ctdb_server_id sid)
1935 {
1936         struct tevent_req *req, *subreq;
1937         struct ctdb_g_lock_unlock_state *state;
1938
1939         req = tevent_req_create(mem_ctx, &state,
1940                                 struct ctdb_g_lock_unlock_state);
1941         if (req == NULL) {
1942                 return NULL;
1943         }
1944
1945         state->ev = ev;
1946         state->client = client;
1947         state->db = db;
1948         state->key.dptr = discard_const(keyname);
1949         state->key.dsize = strlen(keyname) + 1;
1950         state->my_sid = sid;
1951
1952         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1953                                       false);
1954         if (tevent_req_nomem(subreq, req)) {
1955                 return tevent_req_post(req, ev);
1956         }
1957         tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1958
1959         return req;
1960 }
1961
1962 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1963 {
1964         struct tevent_req *req = tevent_req_callback_data(
1965                 subreq, struct tevent_req);
1966         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1967                 req, struct ctdb_g_lock_unlock_state);
1968         TDB_DATA data;
1969         size_t np;
1970         int ret = 0;
1971
1972         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1973         TALLOC_FREE(subreq);
1974         if (state->h == NULL) {
1975                 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s fetch lock failed\n",
1976                                   (char *)state->key.dptr));
1977                 tevent_req_error(req, ret);
1978                 return;
1979         }
1980
1981         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1982                                     &state->lock_list, &np);
1983         if (ret != 0) {
1984                 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s invalid lock data\n",
1985                                   (char *)state->key.dptr));
1986                 tevent_req_error(req, ret);
1987                 return;
1988         }
1989
1990         ret = ctdb_g_lock_unlock_update(req);
1991         if (ret != 0) {
1992                 tevent_req_error(req, ret);
1993                 return;
1994         }
1995
1996         if (state->lock_list->num == 0) {
1997                 subreq = ctdb_delete_record_send(state, state->ev, state->h);
1998                 if (tevent_req_nomem(subreq, req)) {
1999                         return;
2000                 }
2001                 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted,
2002                                         req);
2003                 return;
2004         }
2005
2006         TALLOC_FREE(state->h);
2007         tevent_req_done(req);
2008 }
2009
2010 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
2011 {
2012         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2013                 req, struct ctdb_g_lock_unlock_state);
2014         struct ctdb_g_lock *lock;
2015         int ret, i;
2016
2017         for (i=0; i<state->lock_list->num; i++) {
2018                 lock = &state->lock_list->lock[i];
2019
2020                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
2021                         break;
2022                 }
2023         }
2024
2025         if (i < state->lock_list->num) {
2026                 state->lock_list->lock[i] =
2027                         state->lock_list->lock[state->lock_list->num-1];
2028                 state->lock_list->num -= 1;
2029         }
2030
2031         if (state->lock_list->num != 0) {
2032                 TDB_DATA data;
2033                 size_t np;
2034
2035                 data.dsize = ctdb_g_lock_list_len(state->lock_list);
2036                 data.dptr = talloc_size(state, data.dsize);
2037                 if (data.dptr == NULL) {
2038                         return ENOMEM;
2039                 }
2040
2041                 ctdb_g_lock_list_push(state->lock_list, data.dptr, &np);
2042                 ret = ctdb_store_record(state->h, data);
2043                 talloc_free(data.dptr);
2044                 if (ret != 0) {
2045                         return ret;
2046                 }
2047         }
2048
2049         return 0;
2050 }
2051
2052 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq)
2053 {
2054         struct tevent_req *req = tevent_req_callback_data(
2055                 subreq, struct tevent_req);
2056         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2057                 req, struct ctdb_g_lock_unlock_state);
2058         int ret;
2059         bool status;
2060
2061         status = ctdb_delete_record_recv(subreq, &ret);
2062         if (! status) {
2063                 DEBUG(DEBUG_ERR,
2064                       ("g_lock_unlock %s delete record failed, ret=%d\n",
2065                        (char *)state->key.dptr, ret));
2066                 tevent_req_error(req, ret);
2067                 return;
2068         }
2069
2070         TALLOC_FREE(state->h);
2071         tevent_req_done(req);
2072 }
2073
2074 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
2075 {
2076         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2077                 req, struct ctdb_g_lock_unlock_state);
2078         int err;
2079
2080         TALLOC_FREE(state->h);
2081
2082         if (tevent_req_is_unix_error(req, &err)) {
2083                 if (perr != NULL) {
2084                         *perr = err;
2085                 }
2086                 return false;
2087         }
2088
2089         return true;
2090 }
2091
2092 /*
2093  * Persistent database functions
2094  */
2095 struct ctdb_transaction_start_state {
2096         struct tevent_context *ev;
2097         struct ctdb_client_context *client;
2098         struct timeval timeout;
2099         struct ctdb_transaction_handle *h;
2100         uint32_t destnode;
2101 };
2102
2103 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
2104 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
2105
2106 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
2107                                                struct tevent_context *ev,
2108                                                struct ctdb_client_context *client,
2109                                                struct timeval timeout,
2110                                                struct ctdb_db_context *db,
2111                                                bool readonly)
2112 {
2113         struct ctdb_transaction_start_state *state;
2114         struct tevent_req *req, *subreq;
2115         struct ctdb_transaction_handle *h;
2116
2117         req = tevent_req_create(mem_ctx, &state,
2118                                 struct ctdb_transaction_start_state);
2119         if (req == NULL) {
2120                 return NULL;
2121         }
2122
2123         if (ctdb_db_volatile(db)) {
2124                 tevent_req_error(req, EINVAL);
2125                 return tevent_req_post(req, ev);
2126         }
2127
2128         state->ev = ev;
2129         state->client = client;
2130         state->destnode = ctdb_client_pnn(client);
2131
2132         h = talloc_zero(db, struct ctdb_transaction_handle);
2133         if (tevent_req_nomem(h, req)) {
2134                 return tevent_req_post(req, ev);
2135         }
2136
2137         h->ev = ev;
2138         h->client = client;
2139         h->db = db;
2140         h->readonly = readonly;
2141         h->updated = false;
2142
2143         /* SRVID is unique for databases, so client can have transactions
2144          * active for multiple databases */
2145         h->sid = ctdb_client_get_server_id(client, db->db_id);
2146
2147         h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
2148         if (tevent_req_nomem(h->recbuf, req)) {
2149                 return tevent_req_post(req, ev);
2150         }
2151
2152         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
2153         if (tevent_req_nomem(h->lock_name, req)) {
2154                 return tevent_req_post(req, ev);
2155         }
2156
2157         state->h = h;
2158
2159         subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
2160         if (tevent_req_nomem(subreq, req)) {
2161                 return tevent_req_post(req, ev);
2162         }
2163         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
2164
2165         return req;
2166 }
2167
2168 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
2169 {
2170         struct tevent_req *req = tevent_req_callback_data(
2171                 subreq, struct tevent_req);
2172         struct ctdb_transaction_start_state *state = tevent_req_data(
2173                 req, struct ctdb_transaction_start_state);
2174         bool status;
2175         int ret;
2176
2177         status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
2178         TALLOC_FREE(subreq);
2179         if (! status) {
2180                 DEBUG(DEBUG_ERR,
2181                       ("transaction_start: %s attach g_lock.tdb failed\n",
2182                        state->h->db->db_name));
2183                 tevent_req_error(req, ret);
2184                 return;
2185         }
2186
2187         subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
2188                                        state->h->db_g_lock,
2189                                        state->h->lock_name,
2190                                        &state->h->sid, state->h->readonly);
2191         if (tevent_req_nomem(subreq, req)) {
2192                 return;
2193         }
2194         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
2195 }
2196
2197 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
2198 {
2199         struct tevent_req *req = tevent_req_callback_data(
2200                 subreq, struct tevent_req);
2201         struct ctdb_transaction_start_state *state = tevent_req_data(
2202                 req, struct ctdb_transaction_start_state);
2203         int ret;
2204         bool status;
2205
2206         status = ctdb_g_lock_lock_recv(subreq, &ret);
2207         TALLOC_FREE(subreq);
2208         if (! status) {
2209                 DEBUG(DEBUG_ERR,
2210                       ("transaction_start: %s g_lock lock failed, ret=%d\n",
2211                        state->h->db->db_name, ret));
2212                 tevent_req_error(req, ret);
2213                 return;
2214         }
2215
2216         tevent_req_done(req);
2217 }
2218
2219 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
2220                                         struct tevent_req *req,
2221                                         int *perr)
2222 {
2223         struct ctdb_transaction_start_state *state = tevent_req_data(
2224                 req, struct ctdb_transaction_start_state);
2225         int err;
2226
2227         if (tevent_req_is_unix_error(req, &err)) {
2228                 if (perr != NULL) {
2229                         *perr = err;
2230                 }
2231                 return NULL;
2232         }
2233
2234         return state->h;
2235 }
2236
2237 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
2238                            struct ctdb_client_context *client,
2239                            struct timeval timeout,
2240                            struct ctdb_db_context *db, bool readonly,
2241                            struct ctdb_transaction_handle **out)
2242 {
2243         struct tevent_req *req;
2244         struct ctdb_transaction_handle *h;
2245         int ret;
2246
2247         req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
2248                                           readonly);
2249         if (req == NULL) {
2250                 return ENOMEM;
2251         }
2252
2253         tevent_req_poll(req, ev);
2254
2255         h = ctdb_transaction_start_recv(req, &ret);
2256         if (h == NULL) {
2257                 return ret;
2258         }
2259
2260         *out = h;
2261         return 0;
2262 }
2263
2264 struct ctdb_transaction_record_fetch_state {
2265         TDB_DATA key, data;
2266         struct ctdb_ltdb_header header;
2267         bool found;
2268 };
2269
2270 static int ctdb_transaction_record_fetch_traverse(
2271                                 uint32_t reqid,
2272                                 struct ctdb_ltdb_header *nullheader,
2273                                 TDB_DATA key, TDB_DATA data,
2274                                 void *private_data)
2275 {
2276         struct ctdb_transaction_record_fetch_state *state =
2277                 (struct ctdb_transaction_record_fetch_state *)private_data;
2278
2279         if (state->key.dsize == key.dsize &&
2280             memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
2281                 int ret;
2282
2283                 ret = ctdb_ltdb_header_extract(&data, &state->header);
2284                 if (ret != 0) {
2285                         DEBUG(DEBUG_ERR,
2286                               ("record_fetch: Failed to extract header, "
2287                                "ret=%d\n", ret));
2288                         return 1;
2289                 }
2290
2291                 state->data = data;
2292                 state->found = true;
2293         }
2294
2295         return 0;
2296 }
2297
2298 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
2299                                          TDB_DATA key,
2300                                          struct ctdb_ltdb_header *header,
2301                                          TDB_DATA *data)
2302 {
2303         struct ctdb_transaction_record_fetch_state state;
2304         int ret;
2305
2306         state.key = key;
2307         state.found = false;
2308
2309         ret = ctdb_rec_buffer_traverse(h->recbuf,
2310                                        ctdb_transaction_record_fetch_traverse,
2311                                        &state);
2312         if (ret != 0) {
2313                 return ret;
2314         }
2315
2316         if (state.found) {
2317                 if (header != NULL) {
2318                         *header = state.header;
2319                 }
2320                 if (data != NULL) {
2321                         *data = state.data;
2322                 }
2323                 return 0;
2324         }
2325
2326         return ENOENT;
2327 }
2328
2329 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
2330                                   TDB_DATA key,
2331                                   TALLOC_CTX *mem_ctx, TDB_DATA *data)
2332 {
2333         TDB_DATA tmp_data;
2334         struct ctdb_ltdb_header header;
2335         int ret;
2336
2337         ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
2338         if (ret == 0) {
2339                 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
2340                                            tmp_data.dsize);
2341                 if (data->dptr == NULL) {
2342                         return ENOMEM;
2343                 }
2344                 data->dsize = tmp_data.dsize;
2345                 return 0;
2346         }
2347
2348         ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
2349         if (ret != 0) {
2350                 return ret;
2351         }
2352
2353         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
2354         if (ret != 0) {
2355                 return ret;
2356         }
2357
2358         return 0;
2359 }
2360
2361 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
2362                                   TDB_DATA key, TDB_DATA data)
2363 {
2364         TALLOC_CTX *tmp_ctx;
2365         struct ctdb_ltdb_header header;
2366         TDB_DATA old_data;
2367         int ret;
2368
2369         if (h->readonly) {
2370                 return EINVAL;
2371         }
2372
2373         tmp_ctx = talloc_new(h);
2374         if (tmp_ctx == NULL) {
2375                 return ENOMEM;
2376         }
2377
2378         ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
2379         if (ret != 0) {
2380                 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
2381                 if (ret != 0) {
2382                         return ret;
2383                 }
2384         }
2385
2386         if (old_data.dsize == data.dsize &&
2387             memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
2388                 talloc_free(tmp_ctx);
2389                 return 0;
2390         }
2391
2392         header.dmaster = ctdb_client_pnn(h->client);
2393         header.rsn += 1;
2394
2395         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
2396         talloc_free(tmp_ctx);
2397         if (ret != 0) {
2398                 return ret;
2399         }
2400         h->updated = true;
2401
2402         return 0;
2403 }
2404
2405 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
2406                                    TDB_DATA key)
2407 {
2408         return ctdb_transaction_store_record(h, key, tdb_null);
2409 }
2410
2411 static int ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle *h,
2412                                             uint64_t *seqnum)
2413 {
2414         const char *keyname = CTDB_DB_SEQNUM_KEY;
2415         TDB_DATA key, data;
2416         struct ctdb_ltdb_header header;
2417         int ret;
2418
2419         key.dptr = discard_const(keyname);
2420         key.dsize = strlen(keyname) + 1;
2421
2422         ret = ctdb_ltdb_fetch(h->db, key, &header, h, &data);
2423         if (ret != 0) {
2424                 DEBUG(DEBUG_ERR,
2425                       ("transaction_commit: %s seqnum fetch failed, ret=%d\n",
2426                        h->db->db_name, ret));
2427                 return ret;
2428         }
2429
2430         if (data.dsize == 0) {
2431                 /* initial data */
2432                 *seqnum = 0;
2433                 return 0;
2434         }
2435
2436         if (data.dsize != sizeof(uint64_t)) {
2437                 talloc_free(data.dptr);
2438                 return EINVAL;
2439         }
2440
2441         *seqnum = *(uint64_t *)data.dptr;
2442
2443         talloc_free(data.dptr);
2444         return 0;
2445 }
2446
2447 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
2448                                             uint64_t seqnum)
2449 {
2450         const char *keyname = CTDB_DB_SEQNUM_KEY;
2451         TDB_DATA key, data;
2452
2453         key.dptr = discard_const(keyname);
2454         key.dsize = strlen(keyname) + 1;
2455
2456         data.dptr = (uint8_t *)&seqnum;
2457         data.dsize = sizeof(seqnum);
2458
2459         return ctdb_transaction_store_record(h, key, data);
2460 }
2461
2462 struct ctdb_transaction_commit_state {
2463         struct tevent_context *ev;
2464         struct timeval timeout;
2465         struct ctdb_transaction_handle *h;
2466         uint64_t seqnum;
2467 };
2468
2469 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
2470 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq);
2471
2472 struct tevent_req *ctdb_transaction_commit_send(
2473                                         TALLOC_CTX *mem_ctx,
2474                                         struct tevent_context *ev,
2475                                         struct timeval timeout,
2476                                         struct ctdb_transaction_handle *h)
2477 {
2478         struct tevent_req *req, *subreq;
2479         struct ctdb_transaction_commit_state *state;
2480         struct ctdb_req_control request;
2481         int ret;
2482
2483         req = tevent_req_create(mem_ctx, &state,
2484                                 struct ctdb_transaction_commit_state);
2485         if (req == NULL) {
2486                 return NULL;
2487         }
2488
2489         state->ev = ev;
2490         state->timeout = timeout;
2491         state->h = h;
2492
2493         ret = ctdb_transaction_fetch_db_seqnum(h, &state->seqnum);
2494         if (ret != 0) {
2495                 tevent_req_error(req, ret);
2496                 return tevent_req_post(req, ev);
2497         }
2498
2499         ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
2500         if (ret != 0) {
2501                 tevent_req_error(req, ret);
2502                 return tevent_req_post(req, ev);
2503         }
2504
2505         ctdb_req_control_trans3_commit(&request, h->recbuf);
2506         subreq = ctdb_client_control_send(state, ev, h->client,
2507                                           ctdb_client_pnn(h->client),
2508                                           timeout, &request);
2509         if (tevent_req_nomem(subreq, req)) {
2510                 return tevent_req_post(req, ev);
2511         }
2512         tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2513
2514         return req;
2515 }
2516
2517 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2518 {
2519         struct tevent_req *req = tevent_req_callback_data(
2520                 subreq, struct tevent_req);
2521         struct ctdb_transaction_commit_state *state = tevent_req_data(
2522                 req, struct ctdb_transaction_commit_state);
2523         struct ctdb_transaction_handle *h = state->h;
2524         struct ctdb_reply_control *reply;
2525         uint64_t seqnum;
2526         int ret;
2527         bool status;
2528
2529         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2530         TALLOC_FREE(subreq);
2531         if (! status) {
2532                 DEBUG(DEBUG_ERR,
2533                       ("transaction_commit: %s TRANS3_COMMIT failed, ret=%d\n",
2534                        h->db->db_name, ret));
2535                 tevent_req_error(req, ret);
2536                 return;
2537         }
2538
2539         ret = ctdb_reply_control_trans3_commit(reply);
2540         talloc_free(reply);
2541
2542         if (ret != 0) {
2543                 /* Control failed due to recovery */
2544
2545                 ret = ctdb_transaction_fetch_db_seqnum(h, &seqnum);
2546                 if (ret != 0) {
2547                         tevent_req_error(req, ret);
2548                         return;
2549                 }
2550
2551                 if (seqnum == state->seqnum) {
2552                         struct ctdb_req_control request;
2553
2554                         /* try again */
2555                         ctdb_req_control_trans3_commit(&request,
2556                                                        state->h->recbuf);
2557                         subreq = ctdb_client_control_send(
2558                                         state, state->ev, state->h->client,
2559                                         ctdb_client_pnn(state->h->client),
2560                                         state->timeout, &request);
2561                         if (tevent_req_nomem(subreq, req)) {
2562                                 return;
2563                         }
2564                         tevent_req_set_callback(subreq,
2565                                                 ctdb_transaction_commit_done,
2566                                                 req);
2567                         return;
2568                 }
2569
2570                 if (seqnum != state->seqnum + 1) {
2571                         DEBUG(DEBUG_ERR,
2572                               ("transaction_commit: %s seqnum mismatch "
2573                                "0x%"PRIx64" != 0x%"PRIx64" + 1\n",
2574                                state->h->db->db_name, seqnum, state->seqnum));
2575                         tevent_req_error(req, EIO);
2576                         return;
2577                 }
2578         }
2579
2580         /* trans3_commit successful */
2581         subreq = ctdb_g_lock_unlock_send(state, state->ev, h->client,
2582                                          h->db_g_lock, h->lock_name, h->sid);
2583         if (tevent_req_nomem(subreq, req)) {
2584                 return;
2585         }
2586         tevent_req_set_callback(subreq, ctdb_transaction_commit_g_lock_done,
2587                                 req);
2588 }
2589
2590 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq)
2591 {
2592         struct tevent_req *req = tevent_req_callback_data(
2593                 subreq, struct tevent_req);
2594         struct ctdb_transaction_commit_state *state = tevent_req_data(
2595                 req, struct ctdb_transaction_commit_state);
2596         int ret;
2597         bool status;
2598
2599         status = ctdb_g_lock_unlock_recv(subreq, &ret);
2600         TALLOC_FREE(subreq);
2601         if (! status) {
2602                 DEBUG(DEBUG_ERR,
2603                       ("transaction_commit: %s g_lock unlock failed, ret=%d\n",
2604                        state->h->db->db_name, ret));
2605                 tevent_req_error(req, ret);
2606                 return;
2607         }
2608
2609         talloc_free(state->h);
2610         tevent_req_done(req);
2611 }
2612
2613 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2614 {
2615         int err;
2616
2617         if (tevent_req_is_unix_error(req, &err)) {
2618                 if (perr != NULL) {
2619                         *perr = err;
2620                 }
2621                 return false;
2622         }
2623
2624         return true;
2625 }
2626
2627 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2628 {
2629         struct tevent_context *ev = h->ev;
2630         TALLOC_CTX *mem_ctx;
2631         struct tevent_req *req;
2632         int ret;
2633         bool status;
2634
2635         if (h->readonly || ! h->updated) {
2636                 return ctdb_transaction_cancel(h);
2637         }
2638
2639         mem_ctx = talloc_new(NULL);
2640         if (mem_ctx == NULL) {
2641                 return ENOMEM;
2642         }
2643
2644         req = ctdb_transaction_commit_send(mem_ctx, ev,
2645                                            tevent_timeval_zero(), h);
2646         if (req == NULL) {
2647                 talloc_free(mem_ctx);
2648                 return ENOMEM;
2649         }
2650
2651         tevent_req_poll(req, ev);
2652
2653         status = ctdb_transaction_commit_recv(req, &ret);
2654         if (! status) {
2655                 talloc_free(mem_ctx);
2656                 return ret;
2657         }
2658
2659         talloc_free(mem_ctx);
2660         return 0;
2661 }
2662
2663 struct ctdb_transaction_cancel_state {
2664         struct tevent_context *ev;
2665         struct ctdb_transaction_handle *h;
2666         struct timeval timeout;
2667 };
2668
2669 static void ctdb_transaction_cancel_done(struct tevent_req *subreq);
2670
2671 struct tevent_req *ctdb_transaction_cancel_send(
2672                                         TALLOC_CTX *mem_ctx,
2673                                         struct tevent_context *ev,
2674                                         struct timeval timeout,
2675                                         struct ctdb_transaction_handle *h)
2676 {
2677         struct tevent_req *req, *subreq;
2678         struct ctdb_transaction_cancel_state *state;
2679
2680         req = tevent_req_create(mem_ctx, &state,
2681                                 struct ctdb_transaction_cancel_state);
2682         if (req == NULL) {
2683                 return NULL;
2684         }
2685
2686         state->ev = ev;
2687         state->h = h;
2688         state->timeout = timeout;
2689
2690         subreq = ctdb_g_lock_unlock_send(state, state->ev, state->h->client,
2691                                          state->h->db_g_lock,
2692                                          state->h->lock_name, state->h->sid);
2693         if (tevent_req_nomem(subreq, req)) {
2694                 return tevent_req_post(req, ev);
2695         }
2696         tevent_req_set_callback(subreq, ctdb_transaction_cancel_done,
2697                                 req);
2698
2699         return req;
2700 }
2701
2702 static void ctdb_transaction_cancel_done(struct tevent_req *subreq)
2703 {
2704         struct tevent_req *req = tevent_req_callback_data(
2705                 subreq, struct tevent_req);
2706         struct ctdb_transaction_cancel_state *state = tevent_req_data(
2707                 req, struct ctdb_transaction_cancel_state);
2708         int ret;
2709         bool status;
2710
2711         status = ctdb_g_lock_unlock_recv(subreq, &ret);
2712         TALLOC_FREE(subreq);
2713         if (! status) {
2714                 DEBUG(DEBUG_ERR,
2715                       ("transaction_cancel: %s g_lock unlock failed, ret=%d\n",
2716                        state->h->db->db_name, ret));
2717                 talloc_free(state->h);
2718                 tevent_req_error(req, ret);
2719                 return;
2720         }
2721
2722         talloc_free(state->h);
2723         tevent_req_done(req);
2724 }
2725
2726 bool ctdb_transaction_cancel_recv(struct tevent_req *req, int *perr)
2727 {
2728         int err;
2729
2730         if (tevent_req_is_unix_error(req, &err)) {
2731                 if (perr != NULL) {
2732                         *perr = err;
2733                 }
2734                 return false;
2735         }
2736
2737         return true;
2738 }
2739
2740 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2741 {
2742         struct tevent_context *ev = h->ev;
2743         struct tevent_req *req;
2744         TALLOC_CTX *mem_ctx;
2745         int ret;
2746         bool status;
2747
2748         mem_ctx = talloc_new(NULL);
2749         if (mem_ctx == NULL) {
2750                 talloc_free(h);
2751                 return ENOMEM;
2752         }
2753
2754         req = ctdb_transaction_cancel_send(mem_ctx, ev,
2755                                            tevent_timeval_zero(), h);
2756         if (req == NULL) {
2757                 talloc_free(mem_ctx);
2758                 talloc_free(h);
2759                 return ENOMEM;
2760         }
2761
2762         tevent_req_poll(req, ev);
2763
2764         status = ctdb_transaction_cancel_recv(req, &ret);
2765         if (! status) {
2766                 talloc_free(mem_ctx);
2767                 return ret;
2768         }
2769
2770         talloc_free(mem_ctx);
2771         return 0;
2772 }
2773
2774 /*
2775  * TODO:
2776  *
2777  * In future Samba should register SERVER_ID.
2778  * Make that structure same as struct srvid {}.
2779  */