888cfbc35925a5f233efb5892055dae5ffe03901
[sfrench/samba-autobuild/.git] / ctdb / client / client_db.c
1 /*
2    CTDB client code
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27
28 #include "common/logging.h"
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
39
40 static struct ctdb_db_context *client_db_handle(
41                                         struct ctdb_client_context *client,
42                                         const char *db_name)
43 {
44         struct ctdb_db_context *db;
45
46         for (db = client->db; db != NULL; db = db->next) {
47                 if (strcmp(db_name, db->db_name) == 0) {
48                         return db;
49                 }
50         }
51
52         return NULL;
53 }
54
55 struct ctdb_set_db_flags_state {
56         struct tevent_context *ev;
57         struct ctdb_client_context *client;
58         struct timeval timeout;
59         uint32_t db_id;
60         uint8_t db_flags;
61         bool readonly_done, sticky_done;
62         uint32_t *pnn_list;
63         int count;
64 };
65
66 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
67 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
68 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
69
70 static struct tevent_req *ctdb_set_db_flags_send(
71                                 TALLOC_CTX *mem_ctx,
72                                 struct tevent_context *ev,
73                                 struct ctdb_client_context *client,
74                                 uint32_t destnode, struct timeval timeout,
75                                 uint32_t db_id, uint8_t db_flags)
76 {
77         struct tevent_req *req, *subreq;
78         struct ctdb_set_db_flags_state *state;
79         struct ctdb_req_control request;
80
81         req = tevent_req_create(mem_ctx, &state,
82                                 struct ctdb_set_db_flags_state);
83         if (req == NULL) {
84                 return NULL;
85         }
86
87         if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
88                 tevent_req_done(req);
89                 return tevent_req_post(req, ev);
90         }
91
92         state->ev = ev;
93         state->client = client;
94         state->timeout = timeout;
95         state->db_id = db_id;
96         state->db_flags = db_flags;
97
98         ctdb_req_control_get_nodemap(&request);
99         subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
100                                           &request);
101         if (tevent_req_nomem(subreq, req)) {
102                 return tevent_req_post(req, ev);
103         }
104         tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
105
106         return req;
107 }
108
109 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
110 {
111         struct tevent_req *req = tevent_req_callback_data(
112                 subreq, struct tevent_req);
113         struct ctdb_set_db_flags_state *state = tevent_req_data(
114                 req, struct ctdb_set_db_flags_state);
115         struct ctdb_req_control request;
116         struct ctdb_reply_control *reply;
117         struct ctdb_node_map *nodemap;
118         int ret;
119         bool status;
120
121         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
122         TALLOC_FREE(subreq);
123         if (! status) {
124                 DEBUG(DEBUG_ERR,
125                       ("set_db_flags: 0x%08x GET_NODEMAP failed, ret=%d\n",
126                        state->db_id, ret));
127                 tevent_req_error(req, ret);
128                 return;
129         }
130
131         ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
132         talloc_free(reply);
133         if (ret != 0) {
134                 DEBUG(DEBUG_ERR,
135                       ("set_db_flags: 0x%08x GET_NODEMAP parse failed, ret=%d\n",
136                       state->db_id, ret));
137                 tevent_req_error(req, ret);
138                 return;
139         }
140
141         state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
142                                                state, &state->pnn_list);
143         talloc_free(nodemap);
144         if (state->count <= 0) {
145                 DEBUG(DEBUG_ERR,
146                       ("set_db_flags: 0x%08x no connected nodes, count=%d\n",
147                        state->db_id, state->count));
148                 tevent_req_error(req, ENOMEM);
149                 return;
150         }
151
152         if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
153                 ctdb_req_control_set_db_readonly(&request, state->db_id);
154                 subreq = ctdb_client_control_multi_send(
155                                         state, state->ev, state->client,
156                                         state->pnn_list, state->count,
157                                         state->timeout, &request);
158                 if (tevent_req_nomem(subreq, req)) {
159                         return;
160                 }
161                 tevent_req_set_callback(subreq,
162                                         ctdb_set_db_flags_readonly_done, req);
163         } else {
164                 state->readonly_done = true;
165         }
166
167         if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
168                 ctdb_req_control_set_db_sticky(&request, state->db_id);
169                 subreq = ctdb_client_control_multi_send(
170                                         state, state->ev, state->client,
171                                         state->pnn_list, state->count,
172                                         state->timeout, &request);
173                 if (tevent_req_nomem(subreq, req)) {
174                         return;
175                 }
176                 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
177                                         req);
178         } else {
179                 state->sticky_done = true;
180         }
181 }
182
183 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
184 {
185         struct tevent_req *req = tevent_req_callback_data(
186                 subreq, struct tevent_req);
187         struct ctdb_set_db_flags_state *state = tevent_req_data(
188                 req, struct ctdb_set_db_flags_state);
189         int ret;
190         bool status;
191
192         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
193                                                 NULL);
194         TALLOC_FREE(subreq);
195         if (! status) {
196                 DEBUG(DEBUG_ERR,
197                       ("set_db_flags: 0x%08x SET_DB_READONLY failed, ret=%d\n",
198                        state->db_id, ret));
199                 tevent_req_error(req, ret);
200                 return;
201         }
202
203         state->readonly_done = true;
204
205         if (state->readonly_done && state->sticky_done) {
206                 tevent_req_done(req);
207         }
208 }
209
210 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
211 {
212         struct tevent_req *req = tevent_req_callback_data(
213                 subreq, struct tevent_req);
214         struct ctdb_set_db_flags_state *state = tevent_req_data(
215                 req, struct ctdb_set_db_flags_state);
216         int ret;
217         bool status;
218
219         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
220                                                 NULL);
221         TALLOC_FREE(subreq);
222         if (! status) {
223                 DEBUG(DEBUG_ERR,
224                       ("set_db_flags: 0x%08x SET_DB_STICKY failed, ret=%d\n",
225                        state->db_id, ret));
226                 tevent_req_error(req, ret);
227                 return;
228         }
229
230         state->sticky_done = true;
231
232         if (state->readonly_done && state->sticky_done) {
233                 tevent_req_done(req);
234         }
235 }
236
237 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
238 {
239         int err;
240
241         if (tevent_req_is_unix_error(req, &err)) {
242                 if (perr != NULL) {
243                         *perr = err;
244                 }
245                 return false;
246         }
247         return true;
248 }
249
250 struct ctdb_attach_state {
251         struct tevent_context *ev;
252         struct ctdb_client_context *client;
253         struct timeval timeout;
254         uint32_t destnode;
255         uint8_t db_flags;
256         uint32_t tdb_flags;
257         struct ctdb_db_context *db;
258 };
259
260 static void ctdb_attach_mutex_done(struct tevent_req *subreq);
261 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
262 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
263 static void ctdb_attach_health_done(struct tevent_req *subreq);
264 static void ctdb_attach_flags_done(struct tevent_req *subreq);
265
266 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
267                                     struct tevent_context *ev,
268                                     struct ctdb_client_context *client,
269                                     struct timeval timeout,
270                                     const char *db_name, uint8_t db_flags)
271 {
272         struct tevent_req *req, *subreq;
273         struct ctdb_attach_state *state;
274         struct ctdb_req_control request;
275
276         req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
277         if (req == NULL) {
278                 return NULL;
279         }
280
281         state->db = client_db_handle(client, db_name);
282         if (state->db != NULL) {
283                 tevent_req_done(req);
284                 return tevent_req_post(req, ev);
285         }
286
287         state->ev = ev;
288         state->client = client;
289         state->timeout = timeout;
290         state->destnode = ctdb_client_pnn(client);
291         state->db_flags = db_flags;
292
293         state->db = talloc_zero(client, struct ctdb_db_context);
294         if (tevent_req_nomem(state->db, req)) {
295                 return tevent_req_post(req, ev);
296         }
297
298         state->db->db_name = talloc_strdup(state->db, db_name);
299         if (tevent_req_nomem(state->db, req)) {
300                 return tevent_req_post(req, ev);
301         }
302
303         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
304                 state->db->persistent = true;
305         }
306
307         ctdb_req_control_get_tunable(&request, "TDBMutexEnabled");
308         subreq = ctdb_client_control_send(state, ev, client,
309                                           ctdb_client_pnn(client), timeout,
310                                           &request);
311         if (tevent_req_nomem(subreq, req)) {
312                 return tevent_req_post(req, ev);
313         }
314         tevent_req_set_callback(subreq, ctdb_attach_mutex_done, req);
315
316         return req;
317 }
318
319 static void ctdb_attach_mutex_done(struct tevent_req *subreq)
320 {
321         struct tevent_req *req = tevent_req_callback_data(
322                 subreq, struct tevent_req);
323         struct ctdb_attach_state *state = tevent_req_data(
324                 req, struct ctdb_attach_state);
325         struct ctdb_reply_control *reply;
326         struct ctdb_req_control request;
327         uint32_t mutex_enabled;
328         int ret;
329         bool status;
330
331         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
332         TALLOC_FREE(subreq);
333         if (! status) {
334                 DEBUG(DEBUG_ERR, ("attach: %s GET_TUNABLE failed, ret=%d\n",
335                                   state->db->db_name, ret));
336                 tevent_req_error(req, ret);
337                 return;
338         }
339
340         ret = ctdb_reply_control_get_tunable(reply, &mutex_enabled);
341         if (ret != 0) {
342                 /* Treat error as mutex support not available */
343                 mutex_enabled = 0;
344         }
345
346         if (state->db->persistent) {
347                 state->tdb_flags = TDB_DEFAULT;
348         } else {
349                 state->tdb_flags = (TDB_NOSYNC | TDB_INCOMPATIBLE_HASH |
350                                     TDB_CLEAR_IF_FIRST);
351                 if (mutex_enabled == 1) {
352                         state->tdb_flags |= TDB_MUTEX_LOCKING;
353                 }
354         }
355
356         if (state->db->persistent) {
357                 ctdb_req_control_db_attach_persistent(&request,
358                                                       state->db->db_name,
359                                                       state->tdb_flags);
360         } else {
361                 ctdb_req_control_db_attach(&request, state->db->db_name,
362                                            state->tdb_flags);
363         }
364
365         subreq = ctdb_client_control_send(state, state->ev, state->client,
366                                           state->destnode, state->timeout,
367                                           &request);
368         if (tevent_req_nomem(subreq, req)) {
369                 return;
370         }
371         tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
372 }
373
374 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
375 {
376         struct tevent_req *req = tevent_req_callback_data(
377                 subreq, struct tevent_req);
378         struct ctdb_attach_state *state = tevent_req_data(
379                 req, struct ctdb_attach_state);
380         struct ctdb_req_control request;
381         struct ctdb_reply_control *reply;
382         bool status;
383         int ret;
384
385         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
386         TALLOC_FREE(subreq);
387         if (! status) {
388                 DEBUG(DEBUG_ERR, ("attach: %s %s failed, ret=%d\n",
389                                   state->db->db_name,
390                                   (state->db->persistent
391                                         ? "DB_ATTACH_PERSISTENT"
392                                         : "DB_ATTACH"),
393                                   ret));
394                 tevent_req_error(req, ret);
395                 return;
396         }
397
398         if (state->db->persistent) {
399                 ret = ctdb_reply_control_db_attach_persistent(
400                                 reply, &state->db->db_id);
401         } else {
402                 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
403         }
404         talloc_free(reply);
405         if (ret != 0) {
406                 DEBUG(DEBUG_ERR, ("attach: %s failed to get db_id, ret=%d\n",
407                                   state->db->db_name, ret));
408                 tevent_req_error(req, ret);
409                 return;
410         }
411
412         ctdb_req_control_getdbpath(&request, state->db->db_id);
413         subreq = ctdb_client_control_send(state, state->ev, state->client,
414                                           state->destnode, state->timeout,
415                                           &request);
416         if (tevent_req_nomem(subreq, req)) {
417                 return;
418         }
419         tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
420 }
421
422 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
423 {
424         struct tevent_req *req = tevent_req_callback_data(
425                 subreq, struct tevent_req);
426         struct ctdb_attach_state *state = tevent_req_data(
427                 req, struct ctdb_attach_state);
428         struct ctdb_reply_control *reply;
429         struct ctdb_req_control request;
430         bool status;
431         int ret;
432
433         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
434         TALLOC_FREE(subreq);
435         if (! status) {
436                 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH failed, ret=%d\n",
437                                   state->db->db_name, ret));
438                 tevent_req_error(req, ret);
439                 return;
440         }
441
442         ret = ctdb_reply_control_getdbpath(reply, state->db,
443                                            &state->db->db_path);
444         talloc_free(reply);
445         if (ret != 0) {
446                 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH parse failed, ret=%d\n",
447                                   state->db->db_name, ret));
448                 tevent_req_error(req, ret);
449                 return;
450         }
451
452         ctdb_req_control_db_get_health(&request, state->db->db_id);
453         subreq = ctdb_client_control_send(state, state->ev, state->client,
454                                           state->destnode, state->timeout,
455                                           &request);
456         if (tevent_req_nomem(subreq, req)) {
457                 return;
458         }
459         tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
460 }
461
462 static void ctdb_attach_health_done(struct tevent_req *subreq)
463 {
464         struct tevent_req *req = tevent_req_callback_data(
465                 subreq, struct tevent_req);
466         struct ctdb_attach_state *state = tevent_req_data(
467                 req, struct ctdb_attach_state);
468         struct ctdb_reply_control *reply;
469         const char *reason;
470         bool status;
471         int ret;
472
473         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
474         TALLOC_FREE(subreq);
475         if (! status) {
476                 DEBUG(DEBUG_ERR, ("attach: %s DB_GET_HEALTH failed, ret=%d\n",
477                                   state->db->db_name, ret));
478                 tevent_req_error(req, ret);
479                 return;
480         }
481
482         ret = ctdb_reply_control_db_get_health(reply, state, &reason);
483         if (ret != 0) {
484                 DEBUG(DEBUG_ERR,
485                       ("attach: %s DB_GET_HEALTH parse failed, ret=%d\n",
486                        state->db->db_name, ret));
487                 tevent_req_error(req, ret);
488                 return;
489         }
490
491         if (reason != NULL) {
492                 /* Database unhealthy, avoid attach */
493                 DEBUG(DEBUG_ERR, ("attach: %s database unhealthy (%s)\n",
494                                   state->db->db_name, reason));
495                 tevent_req_error(req, EIO);
496                 return;
497         }
498
499         subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
500                                         state->destnode, state->timeout,
501                                         state->db->db_id, state->db_flags);
502         if (tevent_req_nomem(subreq, req)) {
503                 return;
504         }
505         tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
506 }
507
508 static void ctdb_attach_flags_done(struct tevent_req *subreq)
509 {
510         struct tevent_req *req = tevent_req_callback_data(
511                 subreq, struct tevent_req);
512         struct ctdb_attach_state *state = tevent_req_data(
513                 req, struct ctdb_attach_state);
514         bool status;
515         int ret;
516
517         status = ctdb_set_db_flags_recv(subreq, &ret);
518         TALLOC_FREE(subreq);
519         if (! status) {
520                 DEBUG(DEBUG_ERR, ("attach: %s set db flags 0x%08x failed\n",
521                                   state->db->db_name, state->db_flags));
522                 tevent_req_error(req, ret);
523                 return;
524         }
525
526         state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
527                                         state->tdb_flags, O_RDWR, 0);
528         if (tevent_req_nomem(state->db->ltdb, req)) {
529                 DEBUG(DEBUG_ERR, ("attach: %s tdb_wrap_open failed\n",
530                                   state->db->db_name));
531                 return;
532         }
533         DLIST_ADD(state->client->db, state->db);
534
535         tevent_req_done(req);
536 }
537
538 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
539                       struct ctdb_db_context **out)
540 {
541         struct ctdb_attach_state *state = tevent_req_data(
542                 req, struct ctdb_attach_state);
543         int err;
544
545         if (tevent_req_is_unix_error(req, &err)) {
546                 if (perr != NULL) {
547                         *perr = err;
548                 }
549                 return false;
550         }
551
552         if (out != NULL) {
553                 *out = state->db;
554         }
555         return true;
556 }
557
558 int ctdb_attach(struct tevent_context *ev,
559                 struct ctdb_client_context *client,
560                 struct timeval timeout,
561                 const char *db_name, uint8_t db_flags,
562                 struct ctdb_db_context **out)
563 {
564         TALLOC_CTX *mem_ctx;
565         struct tevent_req *req;
566         bool status;
567         int ret;
568
569         mem_ctx = talloc_new(client);
570         if (mem_ctx == NULL) {
571                 return ENOMEM;
572         }
573
574         req = ctdb_attach_send(mem_ctx, ev, client, timeout,
575                                db_name, db_flags);
576         if (req == NULL) {
577                 talloc_free(mem_ctx);
578                 return ENOMEM;
579         }
580
581         tevent_req_poll(req, ev);
582
583         status = ctdb_attach_recv(req, &ret, out);
584         if (! status) {
585                 talloc_free(mem_ctx);
586                 return ret;
587         }
588
589         /*
590         ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
591         ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
592         ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
593         */
594
595         talloc_free(mem_ctx);
596         return 0;
597 }
598
599 int ctdb_detach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
600                 struct ctdb_client_context *client,
601                 struct timeval timeout, uint32_t db_id)
602 {
603         struct ctdb_db_context *db;
604         int ret;
605
606         ret = ctdb_ctrl_db_detach(mem_ctx, ev, client, client->pnn, timeout,
607                                   db_id);
608         if (ret != 0) {
609                 return ret;
610         }
611
612         for (db = client->db; db != NULL; db = db->next) {
613                 if (db->db_id == db_id) {
614                         DLIST_REMOVE(client->db, db);
615                         break;
616                 }
617         }
618
619         return 0;
620 }
621
622 uint32_t ctdb_db_id(struct ctdb_db_context *db)
623 {
624         return db->db_id;
625 }
626
627 struct ctdb_db_traverse_state {
628         ctdb_rec_parser_func_t parser;
629         void *private_data;
630         bool extract_header;
631         int error;
632 };
633
634 static int ctdb_db_traverse_handler(struct tdb_context *tdb, TDB_DATA key,
635                                     TDB_DATA data, void *private_data)
636 {
637         struct ctdb_db_traverse_state *state =
638                 (struct ctdb_db_traverse_state *)private_data;
639         int ret;
640
641         if (state->extract_header) {
642                 struct ctdb_ltdb_header header;
643
644                 ret = ctdb_ltdb_header_extract(&data, &header);
645                 if (ret != 0) {
646                         state->error = ret;
647                         return 1;
648                 }
649
650                 ret = state->parser(0, &header, key, data, state->private_data);
651         } else {
652                 ret = state->parser(0, NULL, key, data, state->private_data);
653         }
654
655         if (ret != 0) {
656                 state->error = ret;
657                 return 1;
658         }
659
660         return 0;
661 }
662
663 int ctdb_db_traverse(struct ctdb_db_context *db, bool readonly,
664                      bool extract_header,
665                      ctdb_rec_parser_func_t parser, void *private_data)
666 {
667         struct ctdb_db_traverse_state state;
668         int ret;
669
670         state.parser = parser;
671         state.private_data = private_data;
672         state.extract_header = extract_header;
673         state.error = 0;
674
675         if (readonly) {
676                 ret = tdb_traverse_read(db->ltdb->tdb,
677                                         ctdb_db_traverse_handler, &state);
678         } else {
679                 ret = tdb_traverse(db->ltdb->tdb,
680                                    ctdb_db_traverse_handler, &state);
681         }
682
683         if (ret == -1) {
684                 return EIO;
685         }
686
687         return state.error;
688 }
689
690 int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
691                     struct ctdb_ltdb_header *header,
692                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
693 {
694         TDB_DATA rec;
695         int ret;
696
697         rec = tdb_fetch(db->ltdb->tdb, key);
698         if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
699                 /* No record present */
700                 if (rec.dptr != NULL) {
701                         free(rec.dptr);
702                 }
703
704                 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
705                         return EIO;
706                 }
707
708                 header->rsn = 0;
709                 header->dmaster = CTDB_UNKNOWN_PNN;
710                 header->flags = 0;
711
712                 if (data != NULL) {
713                         *data = tdb_null;
714                 }
715                 return 0;
716         }
717
718         ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header);
719         if (ret != 0) {
720                 return ret;
721         }
722
723         ret = 0;
724         if (data != NULL) {
725                 size_t offset = ctdb_ltdb_header_len(header);
726
727                 data->dsize = rec.dsize - offset;
728                 data->dptr = talloc_memdup(mem_ctx, rec.dptr + offset,
729                                            data->dsize);
730                 if (data->dptr == NULL) {
731                         ret = ENOMEM;
732                 }
733         }
734
735         free(rec.dptr);
736         return ret;
737 }
738
739 /*
740  * Fetch a record from volatile database
741  *
742  * Steps:
743  *  1. Get a lock on the hash chain
744  *  2. If the record does not exist, migrate the record
745  *  3. If readonly=true and delegations do not exist, migrate the record.
746  *  4. If readonly=false and delegations exist, migrate the record.
747  *  5. If the local node is not dmaster, migrate the record.
748  *  6. Return record
749  */
750
751 struct ctdb_fetch_lock_state {
752         struct tevent_context *ev;
753         struct ctdb_client_context *client;
754         struct ctdb_record_handle *h;
755         bool readonly;
756         uint32_t pnn;
757 };
758
759 static int ctdb_fetch_lock_check(struct tevent_req *req);
760 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
761 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
762
763 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
764                                         struct tevent_context *ev,
765                                         struct ctdb_client_context *client,
766                                         struct ctdb_db_context *db,
767                                         TDB_DATA key, bool readonly)
768 {
769         struct ctdb_fetch_lock_state *state;
770         struct tevent_req *req;
771         int ret;
772
773         req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
774         if (req == NULL) {
775                 return NULL;
776         }
777
778         state->ev = ev;
779         state->client = client;
780
781         state->h = talloc_zero(db, struct ctdb_record_handle);
782         if (tevent_req_nomem(state->h, req)) {
783                 return tevent_req_post(req, ev);
784         }
785         state->h->client = client;
786         state->h->db = db;
787         state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
788         if (tevent_req_nomem(state->h->key.dptr, req)) {
789                 return tevent_req_post(req, ev);
790         }
791         state->h->key.dsize = key.dsize;
792         state->h->readonly = false;
793
794         state->readonly = readonly;
795         state->pnn = ctdb_client_pnn(client);
796
797         /* Check that database is not persistent */
798         if (db->persistent) {
799                 DEBUG(DEBUG_ERR, ("fetch_lock: %s database not volatile\n",
800                                   db->db_name));
801                 tevent_req_error(req, EINVAL);
802                 return tevent_req_post(req, ev);
803         }
804
805         ret = ctdb_fetch_lock_check(req);
806         if (ret == 0) {
807                 tevent_req_done(req);
808                 return tevent_req_post(req, ev);
809         }
810         if (ret != EAGAIN) {
811                 tevent_req_error(req, ret);
812                 return tevent_req_post(req, ev);
813         }
814         return req;
815 }
816
817 static int ctdb_fetch_lock_check(struct tevent_req *req)
818 {
819         struct ctdb_fetch_lock_state *state = tevent_req_data(
820                 req, struct ctdb_fetch_lock_state);
821         struct ctdb_record_handle *h = state->h;
822         struct ctdb_ltdb_header header;
823         TDB_DATA data = tdb_null;
824         int ret, err = 0;
825         bool do_migrate = false;
826
827         ret = tdb_chainlock(h->db->ltdb->tdb, h->key);
828         if (ret != 0) {
829                 DEBUG(DEBUG_ERR,
830                       ("fetch_lock: %s tdb_chainlock failed, %s\n",
831                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
832                 err = EIO;
833                 goto failed;
834         }
835
836         data = tdb_fetch(h->db->ltdb->tdb, h->key);
837         if (data.dptr == NULL) {
838                 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
839                         goto migrate;
840                 } else {
841                         err = EIO;
842                         goto failed;
843                 }
844         }
845
846         /* Got the record */
847         ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
848         if (ret != 0) {
849                 err = ret;
850                 goto failed;
851         }
852
853         if (! state->readonly) {
854                 /* Read/write access */
855                 if (header.dmaster == state->pnn &&
856                     header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
857                         goto migrate;
858                 }
859
860                 if (header.dmaster != state->pnn) {
861                         goto migrate;
862                 }
863         } else {
864                 /* Readonly access */
865                 if (header.dmaster != state->pnn &&
866                     ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
867                                        CTDB_REC_RO_HAVE_DELEGATIONS))) {
868                         goto migrate;
869                 }
870         }
871
872         /* We are the dmaster or readonly delegation */
873         h->header = header;
874         h->data = data;
875         if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
876                             CTDB_REC_RO_HAVE_DELEGATIONS)) {
877                 h->readonly = true;
878         }
879         return 0;
880
881 migrate:
882         do_migrate = true;
883         err = EAGAIN;
884
885 failed:
886         if (data.dptr != NULL) {
887                 free(data.dptr);
888         }
889         ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
890         if (ret != 0) {
891                 DEBUG(DEBUG_ERR,
892                       ("fetch_lock: %s tdb_chainunlock failed, %s\n",
893                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
894                 return EIO;
895         }
896
897         if (do_migrate) {
898                 ctdb_fetch_lock_migrate(req);
899         }
900         return err;
901 }
902
903 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
904 {
905         struct ctdb_fetch_lock_state *state = tevent_req_data(
906                 req, struct ctdb_fetch_lock_state);
907         struct ctdb_req_call request;
908         struct tevent_req *subreq;
909
910         ZERO_STRUCT(request);
911         request.flags = CTDB_IMMEDIATE_MIGRATION;
912         if (state->readonly) {
913                 request.flags |= CTDB_WANT_READONLY;
914         }
915         request.db_id = state->h->db->db_id;
916         request.callid = CTDB_NULL_FUNC;
917         request.key = state->h->key;
918         request.calldata = tdb_null;
919
920         subreq = ctdb_client_call_send(state, state->ev, state->client,
921                                        &request);
922         if (tevent_req_nomem(subreq, req)) {
923                 return;
924         }
925
926         tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
927 }
928
929 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
930 {
931         struct tevent_req *req = tevent_req_callback_data(
932                 subreq, struct tevent_req);
933         struct ctdb_fetch_lock_state *state = tevent_req_data(
934                 req, struct ctdb_fetch_lock_state);
935         struct ctdb_reply_call *reply;
936         int ret;
937         bool status;
938
939         status = ctdb_client_call_recv(subreq, state, &reply, &ret);
940         TALLOC_FREE(subreq);
941         if (! status) {
942                 DEBUG(DEBUG_ERR, ("fetch_lock: %s CALL failed, ret=%d\n",
943                                   state->h->db->db_name, ret));
944                 tevent_req_error(req, ret);
945                 return;
946         }
947
948         if (reply->status != 0) {
949                 tevent_req_error(req, EIO);
950                 return;
951         }
952         talloc_free(reply);
953
954         ret = ctdb_fetch_lock_check(req);
955         if (ret != 0) {
956                 if (ret != EAGAIN) {
957                         tevent_req_error(req, ret);
958                 }
959                 return;
960         }
961
962         tevent_req_done(req);
963 }
964
965 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
966 {
967         int ret;
968
969         ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
970         if (ret != 0) {
971                 DEBUG(DEBUG_ERR,
972                       ("fetch_lock: %s tdb_chainunlock failed, %s\n",
973                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
974         }
975         free(h->data.dptr);
976         return 0;
977 }
978
979 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
980                                                 struct ctdb_ltdb_header *header,
981                                                 TALLOC_CTX *mem_ctx,
982                                                 TDB_DATA *data, int *perr)
983 {
984         struct ctdb_fetch_lock_state *state = tevent_req_data(
985                 req, struct ctdb_fetch_lock_state);
986         struct ctdb_record_handle *h = state->h;
987         int err;
988
989         if (tevent_req_is_unix_error(req, &err)) {
990                 if (perr != NULL) {
991                         TALLOC_FREE(state->h);
992                         *perr = err;
993                 }
994                 return NULL;
995         }
996
997         if (header != NULL) {
998                 *header = h->header;
999         }
1000         if (data != NULL) {
1001                 size_t offset;
1002
1003                 offset = ctdb_ltdb_header_len(&h->header);
1004
1005                 data->dsize = h->data.dsize - offset;
1006                 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
1007                                            data->dsize);
1008                 if (data->dptr == NULL) {
1009                         TALLOC_FREE(state->h);
1010                         if (perr != NULL) {
1011                                 *perr = ENOMEM;
1012                         }
1013                         return NULL;
1014                 }
1015         }
1016
1017         talloc_set_destructor(h, ctdb_record_handle_destructor);
1018         return h;
1019 }
1020
1021 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1022                     struct ctdb_client_context *client,
1023                     struct ctdb_db_context *db, TDB_DATA key, bool readonly,
1024                     struct ctdb_record_handle **out,
1025                     struct ctdb_ltdb_header *header, TDB_DATA *data)
1026 {
1027         struct tevent_req *req;
1028         struct ctdb_record_handle *h;
1029         int ret;
1030
1031         req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
1032         if (req == NULL) {
1033                 return ENOMEM;
1034         }
1035
1036         tevent_req_poll(req, ev);
1037
1038         h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
1039         if (h == NULL) {
1040                 return ret;
1041         }
1042
1043         *out = h;
1044         return 0;
1045 }
1046
1047 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
1048 {
1049         uint8_t header[sizeof(struct ctdb_ltdb_header)];
1050         TDB_DATA rec[2];
1051         int ret;
1052
1053         /* Cannot modify the record if it was obtained as a readonly copy */
1054         if (h->readonly) {
1055                 return EINVAL;
1056         }
1057
1058         /* Check if the new data is same */
1059         if (h->data.dsize == data.dsize &&
1060             memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
1061                 /* No need to do anything */
1062                 return 0;
1063         }
1064
1065         ctdb_ltdb_header_push(&h->header, header);
1066
1067         rec[0].dsize = ctdb_ltdb_header_len(&h->header);
1068         rec[0].dptr = header;
1069
1070         rec[1].dsize = data.dsize;
1071         rec[1].dptr = data.dptr;
1072
1073         ret = tdb_storev(h->db->ltdb->tdb, h->key, rec, 2, TDB_REPLACE);
1074         if (ret != 0) {
1075                 DEBUG(DEBUG_ERR,
1076                       ("store_record: %s tdb_storev failed, %s\n",
1077                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1078                 return EIO;
1079         }
1080
1081         return 0;
1082 }
1083
1084 struct ctdb_delete_record_state {
1085         struct ctdb_record_handle *h;
1086 };
1087
1088 static void ctdb_delete_record_done(struct tevent_req *subreq);
1089
1090 struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx,
1091                                            struct tevent_context *ev,
1092                                            struct ctdb_record_handle *h)
1093 {
1094         struct tevent_req *req, *subreq;
1095         struct ctdb_delete_record_state *state;
1096         struct ctdb_key_data key;
1097         struct ctdb_req_control request;
1098         uint8_t header[sizeof(struct ctdb_ltdb_header)];
1099         TDB_DATA rec;
1100         int ret;
1101
1102         req = tevent_req_create(mem_ctx, &state,
1103                                 struct ctdb_delete_record_state);
1104         if (req == NULL) {
1105                 return NULL;
1106         }
1107
1108         state->h = h;
1109
1110         /* Cannot delete the record if it was obtained as a readonly copy */
1111         if (h->readonly) {
1112                 DEBUG(DEBUG_ERR, ("fetch_lock delete: %s readonly record\n",
1113                                   h->db->db_name));
1114                 tevent_req_error(req, EINVAL);
1115                 return tevent_req_post(req, ev);
1116         }
1117
1118         ctdb_ltdb_header_push(&h->header, header);
1119
1120         rec.dsize = ctdb_ltdb_header_len(&h->header);
1121         rec.dptr = header;
1122
1123         ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1124         if (ret != 0) {
1125                 DEBUG(DEBUG_ERR,
1126                       ("fetch_lock delete: %s tdb_sore failed, %s\n",
1127                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1128                 tevent_req_error(req, EIO);
1129                 return tevent_req_post(req, ev);
1130         }
1131
1132         key.db_id = h->db->db_id;
1133         key.header = h->header;
1134         key.key = h->key;
1135
1136         ctdb_req_control_schedule_for_deletion(&request, &key);
1137         subreq = ctdb_client_control_send(state, ev, h->client,
1138                                           ctdb_client_pnn(h->client),
1139                                           tevent_timeval_zero(),
1140                                           &request);
1141         if (tevent_req_nomem(subreq, req)) {
1142                 return tevent_req_post(req, ev);
1143         }
1144         tevent_req_set_callback(subreq, ctdb_delete_record_done, req);
1145
1146         return req;
1147 }
1148
1149 static void ctdb_delete_record_done(struct tevent_req *subreq)
1150 {
1151         struct tevent_req *req = tevent_req_callback_data(
1152                 subreq, struct tevent_req);
1153         struct ctdb_delete_record_state *state = tevent_req_data(
1154                 req, struct ctdb_delete_record_state);
1155         int ret;
1156         bool status;
1157
1158         status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
1159         TALLOC_FREE(subreq);
1160         if (! status) {
1161                 DEBUG(DEBUG_ERR,
1162                       ("delete_record: %s SCHDULE_FOR_DELETION failed, "
1163                        "ret=%d\n", state->h->db->db_name, ret));
1164                 tevent_req_error(req, ret);
1165                 return;
1166         }
1167
1168         tevent_req_done(req);
1169 }
1170
1171 bool ctdb_delete_record_recv(struct tevent_req *req, int *perr)
1172 {
1173         int err;
1174
1175         if (tevent_req_is_unix_error(req, &err)) {
1176                 if (perr != NULL) {
1177                         *perr = err;
1178                 }
1179                 return false;
1180         }
1181
1182         return true;
1183 }
1184
1185
1186 int ctdb_delete_record(struct ctdb_record_handle *h)
1187 {
1188         struct tevent_context *ev = h->ev;
1189         TALLOC_CTX *mem_ctx;
1190         struct tevent_req *req;
1191         int ret;
1192         bool status;
1193
1194         mem_ctx = talloc_new(NULL);
1195         if (mem_ctx == NULL) {
1196                 return ENOMEM;
1197         }
1198
1199         req = ctdb_delete_record_send(mem_ctx, ev, h);
1200         if (req == NULL) {
1201                 talloc_free(mem_ctx);
1202                 return ENOMEM;
1203         }
1204
1205         tevent_req_poll(req, ev);
1206
1207         status = ctdb_delete_record_recv(req, &ret);
1208         talloc_free(mem_ctx);
1209         if (! status) {
1210                 return ret;
1211         }
1212
1213         return 0;
1214 }
1215
1216 /*
1217  * Global lock functions
1218  */
1219
1220 struct ctdb_g_lock_lock_state {
1221         struct tevent_context *ev;
1222         struct ctdb_client_context *client;
1223         struct ctdb_db_context *db;
1224         TDB_DATA key;
1225         struct ctdb_server_id my_sid;
1226         enum ctdb_g_lock_type lock_type;
1227         struct ctdb_record_handle *h;
1228         /* state for verification of active locks */
1229         struct ctdb_g_lock_list *lock_list;
1230         unsigned int current;
1231 };
1232
1233 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1234 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1235 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1236 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1237 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1238
1239 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1240                                   enum ctdb_g_lock_type l2)
1241 {
1242         if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1243                 return false;
1244         }
1245         return true;
1246 }
1247
1248 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1249                                          struct tevent_context *ev,
1250                                          struct ctdb_client_context *client,
1251                                          struct ctdb_db_context *db,
1252                                          const char *keyname,
1253                                          struct ctdb_server_id *sid,
1254                                          bool readonly)
1255 {
1256         struct tevent_req *req, *subreq;
1257         struct ctdb_g_lock_lock_state *state;
1258
1259         req = tevent_req_create(mem_ctx, &state,
1260                                 struct ctdb_g_lock_lock_state);
1261         if (req == NULL) {
1262                 return NULL;
1263         }
1264
1265         state->ev = ev;
1266         state->client = client;
1267         state->db = db;
1268         state->key.dptr = discard_const(keyname);
1269         state->key.dsize = strlen(keyname) + 1;
1270         state->my_sid = *sid;
1271         state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1272
1273         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1274                                       false);
1275         if (tevent_req_nomem(subreq, req)) {
1276                 return tevent_req_post(req, ev);
1277         }
1278         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1279
1280         return req;
1281 }
1282
1283 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1284 {
1285         struct tevent_req *req = tevent_req_callback_data(
1286                 subreq, struct tevent_req);
1287         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1288                 req, struct ctdb_g_lock_lock_state);
1289         TDB_DATA data;
1290         int ret = 0;
1291
1292         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1293         TALLOC_FREE(subreq);
1294         if (state->h == NULL) {
1295                 DEBUG(DEBUG_ERR, ("g_lock_lock: %s fetch lock failed\n",
1296                                   (char *)state->key.dptr));
1297                 tevent_req_error(req, ret);
1298                 return;
1299         }
1300
1301         if (state->lock_list != NULL) {
1302                 TALLOC_FREE(state->lock_list);
1303                 state->current = 0;
1304         }
1305
1306         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1307                                     &state->lock_list);
1308         talloc_free(data.dptr);
1309         if (ret != 0) {
1310                 DEBUG(DEBUG_ERR, ("g_lock_lock: %s invalid lock data\n",
1311                                   (char *)state->key.dptr));
1312                 tevent_req_error(req, ret);
1313                 return;
1314         }
1315
1316         ctdb_g_lock_lock_process_locks(req);
1317 }
1318
1319 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1320 {
1321         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1322                 req, struct ctdb_g_lock_lock_state);
1323         struct tevent_req *subreq;
1324         struct ctdb_g_lock *lock;
1325         bool check_server = false;
1326         int ret;
1327
1328         while (state->current < state->lock_list->num) {
1329                 lock = &state->lock_list->lock[state->current];
1330
1331                 /* We should not ask for the same lock more than once */
1332                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1333                         DEBUG(DEBUG_ERR, ("g_lock_lock: %s deadlock\n",
1334                                           (char *)state->key.dptr));
1335                         tevent_req_error(req, EDEADLK);
1336                         return;
1337                 }
1338
1339                 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1340                         check_server = true;
1341                         break;
1342                 }
1343
1344                 state->current += 1;
1345         }
1346
1347         if (check_server) {
1348                 struct ctdb_req_control request;
1349
1350                 ctdb_req_control_process_exists(&request, lock->sid.pid);
1351                 subreq = ctdb_client_control_send(state, state->ev,
1352                                                   state->client,
1353                                                   lock->sid.vnn,
1354                                                   tevent_timeval_zero(),
1355                                                   &request);
1356                 if (tevent_req_nomem(subreq, req)) {
1357                         return;
1358                 }
1359                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1360                 return;
1361         }
1362
1363         /* There is no conflict, add ourself to the lock_list */
1364         state->lock_list->lock = talloc_realloc(state->lock_list,
1365                                                 state->lock_list->lock,
1366                                                 struct ctdb_g_lock,
1367                                                 state->lock_list->num + 1);
1368         if (state->lock_list->lock == NULL) {
1369                 tevent_req_error(req, ENOMEM);
1370                 return;
1371         }
1372
1373         lock = &state->lock_list->lock[state->lock_list->num];
1374         lock->type = state->lock_type;
1375         lock->sid = state->my_sid;
1376         state->lock_list->num += 1;
1377
1378         ret = ctdb_g_lock_lock_update(req);
1379         if (ret != 0) {
1380                 tevent_req_error(req, ret);
1381                 return;
1382         }
1383
1384         TALLOC_FREE(state->h);
1385         tevent_req_done(req);
1386 }
1387
1388 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1389 {
1390         struct tevent_req *req = tevent_req_callback_data(
1391                 subreq, struct tevent_req);
1392         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1393                 req, struct ctdb_g_lock_lock_state);
1394         struct ctdb_reply_control *reply;
1395         int ret, value;
1396         bool status;
1397
1398         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1399         TALLOC_FREE(subreq);
1400         if (! status) {
1401                 DEBUG(DEBUG_ERR,
1402                       ("g_lock_lock: %s PROCESS_EXISTS failed, ret=%d\n",
1403                        (char *)state->key.dptr, ret));
1404                 tevent_req_error(req, ret);
1405                 return;
1406         }
1407
1408         ret = ctdb_reply_control_process_exists(reply, &value);
1409         if (ret != 0) {
1410                 tevent_req_error(req, ret);
1411                 return;
1412         }
1413         talloc_free(reply);
1414
1415         if (value == 0) {
1416                 /* server process exists, need to retry */
1417                 TALLOC_FREE(state->h);
1418                 subreq = tevent_wakeup_send(state, state->ev,
1419                                             tevent_timeval_current_ofs(0,1000));
1420                 if (tevent_req_nomem(subreq, req)) {
1421                         return;
1422                 }
1423                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1424                 return;
1425         }
1426
1427         /* server process does not exist, remove conflicting entry */
1428         state->lock_list->lock[state->current] =
1429                 state->lock_list->lock[state->lock_list->num-1];
1430         state->lock_list->num -= 1;
1431
1432         ret = ctdb_g_lock_lock_update(req);
1433         if (ret != 0) {
1434                 tevent_req_error(req, ret);
1435                 return;
1436         }
1437
1438         ctdb_g_lock_lock_process_locks(req);
1439 }
1440
1441 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1442 {
1443         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1444                 req, struct ctdb_g_lock_lock_state);
1445         TDB_DATA data;
1446         int ret;
1447
1448         data.dsize = ctdb_g_lock_list_len(state->lock_list);
1449         data.dptr = talloc_size(state, data.dsize);
1450         if (data.dptr == NULL) {
1451                 return ENOMEM;
1452         }
1453
1454         ctdb_g_lock_list_push(state->lock_list, data.dptr);
1455         ret = ctdb_store_record(state->h, data);
1456         talloc_free(data.dptr);
1457         return ret;
1458 }
1459
1460 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1461 {
1462         struct tevent_req *req = tevent_req_callback_data(
1463                 subreq, struct tevent_req);
1464         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1465                 req, struct ctdb_g_lock_lock_state);
1466         bool success;
1467
1468         success = tevent_wakeup_recv(subreq);
1469         TALLOC_FREE(subreq);
1470         if (! success) {
1471                 tevent_req_error(req, ENOMEM);
1472                 return;
1473         }
1474
1475         subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1476                                       state->db, state->key, false);
1477         if (tevent_req_nomem(subreq, req)) {
1478                 return;
1479         }
1480         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1481 }
1482
1483 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1484 {
1485         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1486                 req, struct ctdb_g_lock_lock_state);
1487         int err;
1488
1489         TALLOC_FREE(state->h);
1490
1491         if (tevent_req_is_unix_error(req, &err)) {
1492                 if (perr != NULL) {
1493                         *perr = err;
1494                 }
1495                 return false;
1496         }
1497
1498         return true;
1499 }
1500
1501 struct ctdb_g_lock_unlock_state {
1502         struct tevent_context *ev;
1503         struct ctdb_client_context *client;
1504         struct ctdb_db_context *db;
1505         TDB_DATA key;
1506         struct ctdb_server_id my_sid;
1507         struct ctdb_record_handle *h;
1508         struct ctdb_g_lock_list *lock_list;
1509 };
1510
1511 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1512 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1513 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq);
1514
1515 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1516                                            struct tevent_context *ev,
1517                                            struct ctdb_client_context *client,
1518                                            struct ctdb_db_context *db,
1519                                            const char *keyname,
1520                                            struct ctdb_server_id sid)
1521 {
1522         struct tevent_req *req, *subreq;
1523         struct ctdb_g_lock_unlock_state *state;
1524
1525         req = tevent_req_create(mem_ctx, &state,
1526                                 struct ctdb_g_lock_unlock_state);
1527         if (req == NULL) {
1528                 return NULL;
1529         }
1530
1531         state->ev = ev;
1532         state->client = client;
1533         state->db = db;
1534         state->key.dptr = discard_const(keyname);
1535         state->key.dsize = strlen(keyname) + 1;
1536         state->my_sid = sid;
1537
1538         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1539                                       false);
1540         if (tevent_req_nomem(subreq, req)) {
1541                 return tevent_req_post(req, ev);
1542         }
1543         tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1544
1545         return req;
1546 }
1547
1548 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1549 {
1550         struct tevent_req *req = tevent_req_callback_data(
1551                 subreq, struct tevent_req);
1552         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1553                 req, struct ctdb_g_lock_unlock_state);
1554         TDB_DATA data;
1555         int ret = 0;
1556
1557         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1558         TALLOC_FREE(subreq);
1559         if (state->h == NULL) {
1560                 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s fetch lock failed\n",
1561                                   (char *)state->key.dptr));
1562                 tevent_req_error(req, ret);
1563                 return;
1564         }
1565
1566         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1567                                     &state->lock_list);
1568         if (ret != 0) {
1569                 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s invalid lock data\n",
1570                                   (char *)state->key.dptr));
1571                 tevent_req_error(req, ret);
1572                 return;
1573         }
1574
1575         ret = ctdb_g_lock_unlock_update(req);
1576         if (ret != 0) {
1577                 tevent_req_error(req, ret);
1578                 return;
1579         }
1580
1581         if (state->lock_list->num == 0) {
1582                 subreq = ctdb_delete_record_send(state, state->ev, state->h);
1583                 if (tevent_req_nomem(subreq, req)) {
1584                         return;
1585                 }
1586                 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted,
1587                                         req);
1588                 return;
1589         }
1590
1591         TALLOC_FREE(state->h);
1592         tevent_req_done(req);
1593 }
1594
1595 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
1596 {
1597         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1598                 req, struct ctdb_g_lock_unlock_state);
1599         struct ctdb_g_lock *lock;
1600         int ret, i;
1601
1602         for (i=0; i<state->lock_list->num; i++) {
1603                 lock = &state->lock_list->lock[i];
1604
1605                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1606                         break;
1607                 }
1608         }
1609
1610         if (i < state->lock_list->num) {
1611                 state->lock_list->lock[i] =
1612                         state->lock_list->lock[state->lock_list->num-1];
1613                 state->lock_list->num -= 1;
1614         }
1615
1616         if (state->lock_list->num != 0) {
1617                 TDB_DATA data;
1618
1619                 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1620                 data.dptr = talloc_size(state, data.dsize);
1621                 if (data.dptr == NULL) {
1622                         return ENOMEM;
1623                 }
1624
1625                 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1626                 ret = ctdb_store_record(state->h, data);
1627                 talloc_free(data.dptr);
1628                 if (ret != 0) {
1629                         return ret;
1630                 }
1631         }
1632
1633         return 0;
1634 }
1635
1636 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq)
1637 {
1638         struct tevent_req *req = tevent_req_callback_data(
1639                 subreq, struct tevent_req);
1640         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1641                 req, struct ctdb_g_lock_unlock_state);
1642         int ret;
1643         bool status;
1644
1645         status = ctdb_delete_record_recv(subreq, &ret);
1646         if (! status) {
1647                 DEBUG(DEBUG_ERR,
1648                       ("g_lock_unlock %s delete record failed, ret=%d\n",
1649                        (char *)state->key.dptr, ret));
1650                 tevent_req_error(req, ret);
1651                 return;
1652         }
1653
1654         TALLOC_FREE(state->h);
1655         tevent_req_done(req);
1656 }
1657
1658 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
1659 {
1660         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1661                 req, struct ctdb_g_lock_unlock_state);
1662         int err;
1663
1664         TALLOC_FREE(state->h);
1665
1666         if (tevent_req_is_unix_error(req, &err)) {
1667                 if (perr != NULL) {
1668                         *perr = err;
1669                 }
1670                 return false;
1671         }
1672
1673         return true;
1674 }
1675
1676 /*
1677  * Persistent database functions
1678  */
1679 struct ctdb_transaction_start_state {
1680         struct tevent_context *ev;
1681         struct ctdb_client_context *client;
1682         struct timeval timeout;
1683         struct ctdb_transaction_handle *h;
1684         uint32_t destnode;
1685 };
1686
1687 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
1688 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
1689
1690 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
1691                                                struct tevent_context *ev,
1692                                                struct ctdb_client_context *client,
1693                                                struct timeval timeout,
1694                                                struct ctdb_db_context *db,
1695                                                bool readonly)
1696 {
1697         struct ctdb_transaction_start_state *state;
1698         struct tevent_req *req, *subreq;
1699         struct ctdb_transaction_handle *h;
1700
1701         req = tevent_req_create(mem_ctx, &state,
1702                                 struct ctdb_transaction_start_state);
1703         if (req == NULL) {
1704                 return NULL;
1705         }
1706
1707         if (! db->persistent) {
1708                 tevent_req_error(req, EINVAL);
1709                 return tevent_req_post(req, ev);
1710         }
1711
1712         state->ev = ev;
1713         state->client = client;
1714         state->destnode = ctdb_client_pnn(client);
1715
1716         h = talloc_zero(db, struct ctdb_transaction_handle);
1717         if (tevent_req_nomem(h, req)) {
1718                 return tevent_req_post(req, ev);
1719         }
1720
1721         h->ev = ev;
1722         h->client = client;
1723         h->db = db;
1724         h->readonly = readonly;
1725         h->updated = false;
1726
1727         /* SRVID is unique for databases, so client can have transactions
1728          * active for multiple databases */
1729         h->sid = ctdb_client_get_server_id(client, db->db_id);
1730
1731         h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
1732         if (tevent_req_nomem(h->recbuf, req)) {
1733                 return tevent_req_post(req, ev);
1734         }
1735
1736         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
1737         if (tevent_req_nomem(h->lock_name, req)) {
1738                 return tevent_req_post(req, ev);
1739         }
1740
1741         state->h = h;
1742
1743         subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
1744         if (tevent_req_nomem(subreq, req)) {
1745                 return tevent_req_post(req, ev);
1746         }
1747         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
1748
1749         return req;
1750 }
1751
1752 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
1753 {
1754         struct tevent_req *req = tevent_req_callback_data(
1755                 subreq, struct tevent_req);
1756         struct ctdb_transaction_start_state *state = tevent_req_data(
1757                 req, struct ctdb_transaction_start_state);
1758         bool status;
1759         int ret;
1760
1761         status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
1762         TALLOC_FREE(subreq);
1763         if (! status) {
1764                 DEBUG(DEBUG_ERR,
1765                       ("transaction_start: %s attach g_lock.tdb failed\n",
1766                        state->h->db->db_name));
1767                 tevent_req_error(req, ret);
1768                 return;
1769         }
1770
1771         subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
1772                                        state->h->db_g_lock,
1773                                        state->h->lock_name,
1774                                        &state->h->sid, state->h->readonly);
1775         if (tevent_req_nomem(subreq, req)) {
1776                 return;
1777         }
1778         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
1779 }
1780
1781 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
1782 {
1783         struct tevent_req *req = tevent_req_callback_data(
1784                 subreq, struct tevent_req);
1785         struct ctdb_transaction_start_state *state = tevent_req_data(
1786                 req, struct ctdb_transaction_start_state);
1787         int ret;
1788         bool status;
1789
1790         status = ctdb_g_lock_lock_recv(subreq, &ret);
1791         TALLOC_FREE(subreq);
1792         if (! status) {
1793                 DEBUG(DEBUG_ERR,
1794                       ("transaction_start: %s g_lock lock failed, ret=%d\n",
1795                        state->h->db->db_name, ret));
1796                 tevent_req_error(req, ret);
1797                 return;
1798         }
1799
1800         tevent_req_done(req);
1801 }
1802
1803 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
1804                                         struct tevent_req *req,
1805                                         int *perr)
1806 {
1807         struct ctdb_transaction_start_state *state = tevent_req_data(
1808                 req, struct ctdb_transaction_start_state);
1809         int err;
1810
1811         if (tevent_req_is_unix_error(req, &err)) {
1812                 if (perr != NULL) {
1813                         *perr = err;
1814                 }
1815                 return NULL;
1816         }
1817
1818         return state->h;
1819 }
1820
1821 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1822                            struct ctdb_client_context *client,
1823                            struct timeval timeout,
1824                            struct ctdb_db_context *db, bool readonly,
1825                            struct ctdb_transaction_handle **out)
1826 {
1827         struct tevent_req *req;
1828         struct ctdb_transaction_handle *h;
1829         int ret;
1830
1831         req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
1832                                           readonly);
1833         if (req == NULL) {
1834                 return ENOMEM;
1835         }
1836
1837         tevent_req_poll(req, ev);
1838
1839         h = ctdb_transaction_start_recv(req, &ret);
1840         if (h == NULL) {
1841                 return ret;
1842         }
1843
1844         *out = h;
1845         return 0;
1846 }
1847
1848 struct ctdb_transaction_record_fetch_state {
1849         TDB_DATA key, data;
1850         struct ctdb_ltdb_header header;
1851         bool found;
1852 };
1853
1854 static int ctdb_transaction_record_fetch_traverse(
1855                                 uint32_t reqid,
1856                                 struct ctdb_ltdb_header *nullheader,
1857                                 TDB_DATA key, TDB_DATA data,
1858                                 void *private_data)
1859 {
1860         struct ctdb_transaction_record_fetch_state *state =
1861                 (struct ctdb_transaction_record_fetch_state *)private_data;
1862
1863         if (state->key.dsize == key.dsize &&
1864             memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
1865                 int ret;
1866
1867                 ret = ctdb_ltdb_header_extract(&data, &state->header);
1868                 if (ret != 0) {
1869                         DEBUG(DEBUG_ERR,
1870                               ("record_fetch: Failed to extract header, "
1871                                "ret=%d\n", ret));
1872                         return 1;
1873                 }
1874
1875                 state->data = data;
1876                 state->found = true;
1877         }
1878
1879         return 0;
1880 }
1881
1882 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
1883                                          TDB_DATA key,
1884                                          struct ctdb_ltdb_header *header,
1885                                          TDB_DATA *data)
1886 {
1887         struct ctdb_transaction_record_fetch_state state;
1888         int ret;
1889
1890         state.key = key;
1891         state.found = false;
1892
1893         ret = ctdb_rec_buffer_traverse(h->recbuf,
1894                                        ctdb_transaction_record_fetch_traverse,
1895                                        &state);
1896         if (ret != 0) {
1897                 return ret;
1898         }
1899
1900         if (state.found) {
1901                 if (header != NULL) {
1902                         *header = state.header;
1903                 }
1904                 if (data != NULL) {
1905                         *data = state.data;
1906                 }
1907                 return 0;
1908         }
1909
1910         return ENOENT;
1911 }
1912
1913 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
1914                                   TDB_DATA key,
1915                                   TALLOC_CTX *mem_ctx, TDB_DATA *data)
1916 {
1917         TDB_DATA tmp_data;
1918         struct ctdb_ltdb_header header;
1919         int ret;
1920
1921         ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
1922         if (ret == 0) {
1923                 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
1924                                            tmp_data.dsize);
1925                 if (data->dptr == NULL) {
1926                         return ENOMEM;
1927                 }
1928                 data->dsize = tmp_data.dsize;
1929                 return 0;
1930         }
1931
1932         ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
1933         if (ret != 0) {
1934                 return ret;
1935         }
1936
1937         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
1938         if (ret != 0) {
1939                 return ret;
1940         }
1941
1942         return 0;
1943 }
1944
1945 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
1946                                   TDB_DATA key, TDB_DATA data)
1947 {
1948         TALLOC_CTX *tmp_ctx;
1949         struct ctdb_ltdb_header header;
1950         TDB_DATA old_data;
1951         int ret;
1952
1953         if (h->readonly) {
1954                 return EINVAL;
1955         }
1956
1957         tmp_ctx = talloc_new(h);
1958         if (tmp_ctx == NULL) {
1959                 return ENOMEM;
1960         }
1961
1962         ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
1963         if (ret != 0) {
1964                 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
1965                 if (ret != 0) {
1966                         return ret;
1967                 }
1968         }
1969
1970         if (old_data.dsize == data.dsize &&
1971             memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
1972                 talloc_free(tmp_ctx);
1973                 return 0;
1974         }
1975
1976         header.dmaster = ctdb_client_pnn(h->client);
1977         header.rsn += 1;
1978
1979         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
1980         talloc_free(tmp_ctx);
1981         if (ret != 0) {
1982                 return ret;
1983         }
1984         h->updated = true;
1985
1986         return 0;
1987 }
1988
1989 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
1990                                    TDB_DATA key)
1991 {
1992         return ctdb_transaction_store_record(h, key, tdb_null);
1993 }
1994
1995 static int ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle *h,
1996                                             uint64_t *seqnum)
1997 {
1998         const char *keyname = CTDB_DB_SEQNUM_KEY;
1999         TDB_DATA key, data;
2000         struct ctdb_ltdb_header header;
2001         int ret;
2002
2003         key.dptr = discard_const(keyname);
2004         key.dsize = strlen(keyname) + 1;
2005
2006         ret = ctdb_ltdb_fetch(h->db, key, &header, h, &data);
2007         if (ret != 0) {
2008                 DEBUG(DEBUG_ERR,
2009                       ("transaction_commit: %s seqnum fetch failed, ret=%d\n",
2010                        h->db->db_name, ret));
2011                 return ret;
2012         }
2013
2014         if (data.dsize == 0) {
2015                 /* initial data */
2016                 *seqnum = 0;
2017                 return 0;
2018         }
2019
2020         if (data.dsize != sizeof(uint64_t)) {
2021                 talloc_free(data.dptr);
2022                 return EINVAL;
2023         }
2024
2025         *seqnum = *(uint64_t *)data.dptr;
2026
2027         talloc_free(data.dptr);
2028         return 0;
2029 }
2030
2031 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
2032                                             uint64_t seqnum)
2033 {
2034         const char *keyname = CTDB_DB_SEQNUM_KEY;
2035         TDB_DATA key, data;
2036
2037         key.dptr = discard_const(keyname);
2038         key.dsize = strlen(keyname) + 1;
2039
2040         data.dptr = (uint8_t *)&seqnum;
2041         data.dsize = sizeof(seqnum);
2042
2043         return ctdb_transaction_store_record(h, key, data);
2044 }
2045
2046 struct ctdb_transaction_commit_state {
2047         struct tevent_context *ev;
2048         struct timeval timeout;
2049         struct ctdb_transaction_handle *h;
2050         uint64_t seqnum;
2051 };
2052
2053 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
2054 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq);
2055
2056 struct tevent_req *ctdb_transaction_commit_send(
2057                                         TALLOC_CTX *mem_ctx,
2058                                         struct tevent_context *ev,
2059                                         struct timeval timeout,
2060                                         struct ctdb_transaction_handle *h)
2061 {
2062         struct tevent_req *req, *subreq;
2063         struct ctdb_transaction_commit_state *state;
2064         struct ctdb_req_control request;
2065         int ret;
2066
2067         req = tevent_req_create(mem_ctx, &state,
2068                                 struct ctdb_transaction_commit_state);
2069         if (req == NULL) {
2070                 return NULL;
2071         }
2072
2073         state->ev = ev;
2074         state->timeout = timeout;
2075         state->h = h;
2076
2077         ret = ctdb_transaction_fetch_db_seqnum(h, &state->seqnum);
2078         if (ret != 0) {
2079                 tevent_req_error(req, ret);
2080                 return tevent_req_post(req, ev);
2081         }
2082
2083         ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
2084         if (ret != 0) {
2085                 tevent_req_error(req, ret);
2086                 return tevent_req_post(req, ev);
2087         }
2088
2089         ctdb_req_control_trans3_commit(&request, h->recbuf);
2090         subreq = ctdb_client_control_send(state, ev, h->client,
2091                                           ctdb_client_pnn(h->client),
2092                                           timeout, &request);
2093         if (tevent_req_nomem(subreq, req)) {
2094                 return tevent_req_post(req, ev);
2095         }
2096         tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2097
2098         return req;
2099 }
2100
2101 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2102 {
2103         struct tevent_req *req = tevent_req_callback_data(
2104                 subreq, struct tevent_req);
2105         struct ctdb_transaction_commit_state *state = tevent_req_data(
2106                 req, struct ctdb_transaction_commit_state);
2107         struct ctdb_transaction_handle *h = state->h;
2108         struct ctdb_reply_control *reply;
2109         uint64_t seqnum;
2110         int ret;
2111         bool status;
2112
2113         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2114         TALLOC_FREE(subreq);
2115         if (! status) {
2116                 DEBUG(DEBUG_ERR,
2117                       ("transaction_commit: %s TRANS3_COMMIT failed, ret=%d\n",
2118                        h->db->db_name, ret));
2119                 tevent_req_error(req, ret);
2120                 return;
2121         }
2122
2123         ret = ctdb_reply_control_trans3_commit(reply);
2124         talloc_free(reply);
2125
2126         if (ret != 0) {
2127                 /* Control failed due to recovery */
2128
2129                 ret = ctdb_transaction_fetch_db_seqnum(h, &seqnum);
2130                 if (ret != 0) {
2131                         tevent_req_error(req, ret);
2132                         return;
2133                 }
2134
2135                 if (seqnum == state->seqnum) {
2136                         struct ctdb_req_control request;
2137
2138                         /* try again */
2139                         ctdb_req_control_trans3_commit(&request,
2140                                                        state->h->recbuf);
2141                         subreq = ctdb_client_control_send(
2142                                         state, state->ev, state->h->client,
2143                                         ctdb_client_pnn(state->h->client),
2144                                         state->timeout, &request);
2145                         if (tevent_req_nomem(subreq, req)) {
2146                                 return;
2147                         }
2148                         tevent_req_set_callback(subreq,
2149                                                 ctdb_transaction_commit_done,
2150                                                 req);
2151                         return;
2152                 }
2153
2154                 if (seqnum != state->seqnum + 1) {
2155                         DEBUG(DEBUG_ERR,
2156                               ("transaction_commit: %s seqnum mismatch "
2157                                "0x%"PRIx64" != 0x%"PRIx64" + 1\n",
2158                                state->h->db->db_name, seqnum, state->seqnum));
2159                         tevent_req_error(req, EIO);
2160                         return;
2161                 }
2162         }
2163
2164         /* trans3_commit successful */
2165         subreq = ctdb_g_lock_unlock_send(state, state->ev, h->client,
2166                                          h->db_g_lock, h->lock_name, h->sid);
2167         if (tevent_req_nomem(subreq, req)) {
2168                 return;
2169         }
2170         tevent_req_set_callback(subreq, ctdb_transaction_commit_g_lock_done,
2171                                 req);
2172 }
2173
2174 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq)
2175 {
2176         struct tevent_req *req = tevent_req_callback_data(
2177                 subreq, struct tevent_req);
2178         struct ctdb_transaction_commit_state *state = tevent_req_data(
2179                 req, struct ctdb_transaction_commit_state);
2180         int ret;
2181         bool status;
2182
2183         status = ctdb_g_lock_unlock_recv(subreq, &ret);
2184         TALLOC_FREE(subreq);
2185         if (! status) {
2186                 DEBUG(DEBUG_ERR,
2187                       ("transaction_commit: %s g_lock unlock failed, ret=%d\n",
2188                        state->h->db->db_name, ret));
2189                 tevent_req_error(req, ret);
2190                 return;
2191         }
2192
2193         talloc_free(state->h);
2194         tevent_req_done(req);
2195 }
2196
2197 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2198 {
2199         int err;
2200
2201         if (tevent_req_is_unix_error(req, &err)) {
2202                 if (perr != NULL) {
2203                         *perr = err;
2204                 }
2205                 return false;
2206         }
2207
2208         return true;
2209 }
2210
2211 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2212 {
2213         struct tevent_context *ev = h->ev;
2214         TALLOC_CTX *mem_ctx;
2215         struct tevent_req *req;
2216         int ret;
2217         bool status;
2218
2219         if (h->readonly || ! h->updated) {
2220                 return ctdb_transaction_cancel(h);
2221         }
2222
2223         mem_ctx = talloc_new(NULL);
2224         if (mem_ctx == NULL) {
2225                 return ENOMEM;
2226         }
2227
2228         req = ctdb_transaction_commit_send(mem_ctx, ev,
2229                                            tevent_timeval_zero(), h);
2230         if (req == NULL) {
2231                 talloc_free(mem_ctx);
2232                 return ENOMEM;
2233         }
2234
2235         tevent_req_poll(req, ev);
2236
2237         status = ctdb_transaction_commit_recv(req, &ret);
2238         if (! status) {
2239                 talloc_free(mem_ctx);
2240                 return ret;
2241         }
2242
2243         talloc_free(mem_ctx);
2244         return 0;
2245 }
2246
2247 struct ctdb_transaction_cancel_state {
2248         struct tevent_context *ev;
2249         struct ctdb_transaction_handle *h;
2250         struct timeval timeout;
2251 };
2252
2253 static void ctdb_transaction_cancel_done(struct tevent_req *subreq);
2254
2255 struct tevent_req *ctdb_transaction_cancel_send(
2256                                         TALLOC_CTX *mem_ctx,
2257                                         struct tevent_context *ev,
2258                                         struct timeval timeout,
2259                                         struct ctdb_transaction_handle *h)
2260 {
2261         struct tevent_req *req, *subreq;
2262         struct ctdb_transaction_cancel_state *state;
2263
2264         req = tevent_req_create(mem_ctx, &state,
2265                                 struct ctdb_transaction_cancel_state);
2266         if (req == NULL) {
2267                 return NULL;
2268         }
2269
2270         state->ev = ev;
2271         state->h = h;
2272         state->timeout = timeout;
2273
2274         subreq = ctdb_g_lock_unlock_send(state, state->ev, state->h->client,
2275                                          state->h->db_g_lock,
2276                                          state->h->lock_name, state->h->sid);
2277         if (tevent_req_nomem(subreq, req)) {
2278                 return tevent_req_post(req, ev);
2279         }
2280         tevent_req_set_callback(subreq, ctdb_transaction_cancel_done,
2281                                 req);
2282
2283         return req;
2284 }
2285
2286 static void ctdb_transaction_cancel_done(struct tevent_req *subreq)
2287 {
2288         struct tevent_req *req = tevent_req_callback_data(
2289                 subreq, struct tevent_req);
2290         struct ctdb_transaction_cancel_state *state = tevent_req_data(
2291                 req, struct ctdb_transaction_cancel_state);
2292         int ret;
2293         bool status;
2294
2295         status = ctdb_g_lock_unlock_recv(subreq, &ret);
2296         TALLOC_FREE(subreq);
2297         if (! status) {
2298                 DEBUG(DEBUG_ERR,
2299                       ("transaction_cancel: %s g_lock unlock failed, ret=%d\n",
2300                        state->h->db->db_name, ret));
2301                 talloc_free(state->h);
2302                 tevent_req_error(req, ret);
2303                 return;
2304         }
2305
2306         talloc_free(state->h);
2307         tevent_req_done(req);
2308 }
2309
2310 bool ctdb_transaction_cancel_recv(struct tevent_req *req, int *perr)
2311 {
2312         int err;
2313
2314         if (tevent_req_is_unix_error(req, &err)) {
2315                 if (perr != NULL) {
2316                         *perr = err;
2317                 }
2318                 return false;
2319         }
2320
2321         return true;
2322 }
2323
2324 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2325 {
2326         struct tevent_context *ev = h->ev;
2327         struct tevent_req *req;
2328         TALLOC_CTX *mem_ctx;
2329         int ret;
2330         bool status;
2331
2332         mem_ctx = talloc_new(NULL);
2333         if (mem_ctx == NULL) {
2334                 talloc_free(h);
2335                 return ENOMEM;
2336         }
2337
2338         req = ctdb_transaction_cancel_send(mem_ctx, ev,
2339                                            tevent_timeval_zero(), h);
2340         if (req == NULL) {
2341                 talloc_free(mem_ctx);
2342                 talloc_free(h);
2343                 return ENOMEM;
2344         }
2345
2346         tevent_req_poll(req, ev);
2347
2348         status = ctdb_transaction_cancel_recv(req, &ret);
2349         if (! status) {
2350                 talloc_free(mem_ctx);
2351                 return ret;
2352         }
2353
2354         talloc_free(mem_ctx);
2355         return 0;
2356 }
2357
2358 /*
2359  * TODO:
2360  *
2361  * In future Samba should register SERVER_ID.
2362  * Make that structure same as struct srvid {}.
2363  */