ctdb-client: Rename ctdb_db_travese to ctdb_db_traverse_local
[sfrench/samba-autobuild/.git] / ctdb / client / client_db.c
1 /*
2    CTDB client code
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27
28 #include "common/logging.h"
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
39
40 static struct ctdb_db_context *client_db_handle(
41                                         struct ctdb_client_context *client,
42                                         const char *db_name)
43 {
44         struct ctdb_db_context *db;
45
46         for (db = client->db; db != NULL; db = db->next) {
47                 if (strcmp(db_name, db->db_name) == 0) {
48                         return db;
49                 }
50         }
51
52         return NULL;
53 }
54
55 struct ctdb_set_db_flags_state {
56         struct tevent_context *ev;
57         struct ctdb_client_context *client;
58         struct timeval timeout;
59         uint32_t db_id;
60         uint8_t db_flags;
61         bool readonly_done, sticky_done;
62         uint32_t *pnn_list;
63         int count;
64 };
65
66 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
67 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
68 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
69
70 static struct tevent_req *ctdb_set_db_flags_send(
71                                 TALLOC_CTX *mem_ctx,
72                                 struct tevent_context *ev,
73                                 struct ctdb_client_context *client,
74                                 uint32_t destnode, struct timeval timeout,
75                                 uint32_t db_id, uint8_t db_flags)
76 {
77         struct tevent_req *req, *subreq;
78         struct ctdb_set_db_flags_state *state;
79         struct ctdb_req_control request;
80
81         req = tevent_req_create(mem_ctx, &state,
82                                 struct ctdb_set_db_flags_state);
83         if (req == NULL) {
84                 return NULL;
85         }
86
87         if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
88                 tevent_req_done(req);
89                 return tevent_req_post(req, ev);
90         }
91
92         state->ev = ev;
93         state->client = client;
94         state->timeout = timeout;
95         state->db_id = db_id;
96         state->db_flags = db_flags;
97
98         ctdb_req_control_get_nodemap(&request);
99         subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
100                                           &request);
101         if (tevent_req_nomem(subreq, req)) {
102                 return tevent_req_post(req, ev);
103         }
104         tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
105
106         return req;
107 }
108
109 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
110 {
111         struct tevent_req *req = tevent_req_callback_data(
112                 subreq, struct tevent_req);
113         struct ctdb_set_db_flags_state *state = tevent_req_data(
114                 req, struct ctdb_set_db_flags_state);
115         struct ctdb_req_control request;
116         struct ctdb_reply_control *reply;
117         struct ctdb_node_map *nodemap;
118         int ret;
119         bool status;
120
121         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
122         TALLOC_FREE(subreq);
123         if (! status) {
124                 DEBUG(DEBUG_ERR,
125                       ("set_db_flags: 0x%08x GET_NODEMAP failed, ret=%d\n",
126                        state->db_id, ret));
127                 tevent_req_error(req, ret);
128                 return;
129         }
130
131         ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
132         talloc_free(reply);
133         if (ret != 0) {
134                 DEBUG(DEBUG_ERR,
135                       ("set_db_flags: 0x%08x GET_NODEMAP parse failed, ret=%d\n",
136                       state->db_id, ret));
137                 tevent_req_error(req, ret);
138                 return;
139         }
140
141         state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
142                                                state, &state->pnn_list);
143         talloc_free(nodemap);
144         if (state->count <= 0) {
145                 DEBUG(DEBUG_ERR,
146                       ("set_db_flags: 0x%08x no connected nodes, count=%d\n",
147                        state->db_id, state->count));
148                 tevent_req_error(req, ENOMEM);
149                 return;
150         }
151
152         if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
153                 ctdb_req_control_set_db_readonly(&request, state->db_id);
154                 subreq = ctdb_client_control_multi_send(
155                                         state, state->ev, state->client,
156                                         state->pnn_list, state->count,
157                                         state->timeout, &request);
158                 if (tevent_req_nomem(subreq, req)) {
159                         return;
160                 }
161                 tevent_req_set_callback(subreq,
162                                         ctdb_set_db_flags_readonly_done, req);
163         } else {
164                 state->readonly_done = true;
165         }
166
167         if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
168                 ctdb_req_control_set_db_sticky(&request, state->db_id);
169                 subreq = ctdb_client_control_multi_send(
170                                         state, state->ev, state->client,
171                                         state->pnn_list, state->count,
172                                         state->timeout, &request);
173                 if (tevent_req_nomem(subreq, req)) {
174                         return;
175                 }
176                 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
177                                         req);
178         } else {
179                 state->sticky_done = true;
180         }
181 }
182
183 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
184 {
185         struct tevent_req *req = tevent_req_callback_data(
186                 subreq, struct tevent_req);
187         struct ctdb_set_db_flags_state *state = tevent_req_data(
188                 req, struct ctdb_set_db_flags_state);
189         int ret;
190         bool status;
191
192         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
193                                                 NULL);
194         TALLOC_FREE(subreq);
195         if (! status) {
196                 DEBUG(DEBUG_ERR,
197                       ("set_db_flags: 0x%08x SET_DB_READONLY failed, ret=%d\n",
198                        state->db_id, ret));
199                 tevent_req_error(req, ret);
200                 return;
201         }
202
203         state->readonly_done = true;
204
205         if (state->readonly_done && state->sticky_done) {
206                 tevent_req_done(req);
207         }
208 }
209
210 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
211 {
212         struct tevent_req *req = tevent_req_callback_data(
213                 subreq, struct tevent_req);
214         struct ctdb_set_db_flags_state *state = tevent_req_data(
215                 req, struct ctdb_set_db_flags_state);
216         int ret;
217         bool status;
218
219         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
220                                                 NULL);
221         TALLOC_FREE(subreq);
222         if (! status) {
223                 DEBUG(DEBUG_ERR,
224                       ("set_db_flags: 0x%08x SET_DB_STICKY failed, ret=%d\n",
225                        state->db_id, ret));
226                 tevent_req_error(req, ret);
227                 return;
228         }
229
230         state->sticky_done = true;
231
232         if (state->readonly_done && state->sticky_done) {
233                 tevent_req_done(req);
234         }
235 }
236
237 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
238 {
239         int err;
240
241         if (tevent_req_is_unix_error(req, &err)) {
242                 if (perr != NULL) {
243                         *perr = err;
244                 }
245                 return false;
246         }
247         return true;
248 }
249
250 struct ctdb_attach_state {
251         struct tevent_context *ev;
252         struct ctdb_client_context *client;
253         struct timeval timeout;
254         uint32_t destnode;
255         uint8_t db_flags;
256         uint32_t tdb_flags;
257         struct ctdb_db_context *db;
258 };
259
260 static void ctdb_attach_mutex_done(struct tevent_req *subreq);
261 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
262 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
263 static void ctdb_attach_health_done(struct tevent_req *subreq);
264 static void ctdb_attach_flags_done(struct tevent_req *subreq);
265
266 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
267                                     struct tevent_context *ev,
268                                     struct ctdb_client_context *client,
269                                     struct timeval timeout,
270                                     const char *db_name, uint8_t db_flags)
271 {
272         struct tevent_req *req, *subreq;
273         struct ctdb_attach_state *state;
274         struct ctdb_req_control request;
275
276         req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
277         if (req == NULL) {
278                 return NULL;
279         }
280
281         state->db = client_db_handle(client, db_name);
282         if (state->db != NULL) {
283                 tevent_req_done(req);
284                 return tevent_req_post(req, ev);
285         }
286
287         state->ev = ev;
288         state->client = client;
289         state->timeout = timeout;
290         state->destnode = ctdb_client_pnn(client);
291         state->db_flags = db_flags;
292
293         state->db = talloc_zero(client, struct ctdb_db_context);
294         if (tevent_req_nomem(state->db, req)) {
295                 return tevent_req_post(req, ev);
296         }
297
298         state->db->db_name = talloc_strdup(state->db, db_name);
299         if (tevent_req_nomem(state->db, req)) {
300                 return tevent_req_post(req, ev);
301         }
302
303         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
304                 state->db->persistent = true;
305         }
306
307         ctdb_req_control_get_tunable(&request, "TDBMutexEnabled");
308         subreq = ctdb_client_control_send(state, ev, client,
309                                           ctdb_client_pnn(client), timeout,
310                                           &request);
311         if (tevent_req_nomem(subreq, req)) {
312                 return tevent_req_post(req, ev);
313         }
314         tevent_req_set_callback(subreq, ctdb_attach_mutex_done, req);
315
316         return req;
317 }
318
319 static void ctdb_attach_mutex_done(struct tevent_req *subreq)
320 {
321         struct tevent_req *req = tevent_req_callback_data(
322                 subreq, struct tevent_req);
323         struct ctdb_attach_state *state = tevent_req_data(
324                 req, struct ctdb_attach_state);
325         struct ctdb_reply_control *reply;
326         struct ctdb_req_control request;
327         uint32_t mutex_enabled;
328         int ret;
329         bool status;
330
331         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
332         TALLOC_FREE(subreq);
333         if (! status) {
334                 DEBUG(DEBUG_ERR, ("attach: %s GET_TUNABLE failed, ret=%d\n",
335                                   state->db->db_name, ret));
336                 tevent_req_error(req, ret);
337                 return;
338         }
339
340         ret = ctdb_reply_control_get_tunable(reply, &mutex_enabled);
341         if (ret != 0) {
342                 /* Treat error as mutex support not available */
343                 mutex_enabled = 0;
344         }
345
346         if (state->db->persistent) {
347                 state->tdb_flags = TDB_DEFAULT;
348         } else {
349                 state->tdb_flags = (TDB_NOSYNC | TDB_INCOMPATIBLE_HASH |
350                                     TDB_CLEAR_IF_FIRST);
351                 if (mutex_enabled == 1) {
352                         state->tdb_flags |= TDB_MUTEX_LOCKING;
353                 }
354         }
355
356         if (state->db->persistent) {
357                 ctdb_req_control_db_attach_persistent(&request,
358                                                       state->db->db_name,
359                                                       state->tdb_flags);
360         } else {
361                 ctdb_req_control_db_attach(&request, state->db->db_name,
362                                            state->tdb_flags);
363         }
364
365         subreq = ctdb_client_control_send(state, state->ev, state->client,
366                                           state->destnode, state->timeout,
367                                           &request);
368         if (tevent_req_nomem(subreq, req)) {
369                 return;
370         }
371         tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
372 }
373
374 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
375 {
376         struct tevent_req *req = tevent_req_callback_data(
377                 subreq, struct tevent_req);
378         struct ctdb_attach_state *state = tevent_req_data(
379                 req, struct ctdb_attach_state);
380         struct ctdb_req_control request;
381         struct ctdb_reply_control *reply;
382         bool status;
383         int ret;
384
385         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
386         TALLOC_FREE(subreq);
387         if (! status) {
388                 DEBUG(DEBUG_ERR, ("attach: %s %s failed, ret=%d\n",
389                                   state->db->db_name,
390                                   (state->db->persistent
391                                         ? "DB_ATTACH_PERSISTENT"
392                                         : "DB_ATTACH"),
393                                   ret));
394                 tevent_req_error(req, ret);
395                 return;
396         }
397
398         if (state->db->persistent) {
399                 ret = ctdb_reply_control_db_attach_persistent(
400                                 reply, &state->db->db_id);
401         } else {
402                 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
403         }
404         talloc_free(reply);
405         if (ret != 0) {
406                 DEBUG(DEBUG_ERR, ("attach: %s failed to get db_id, ret=%d\n",
407                                   state->db->db_name, ret));
408                 tevent_req_error(req, ret);
409                 return;
410         }
411
412         ctdb_req_control_getdbpath(&request, state->db->db_id);
413         subreq = ctdb_client_control_send(state, state->ev, state->client,
414                                           state->destnode, state->timeout,
415                                           &request);
416         if (tevent_req_nomem(subreq, req)) {
417                 return;
418         }
419         tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
420 }
421
422 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
423 {
424         struct tevent_req *req = tevent_req_callback_data(
425                 subreq, struct tevent_req);
426         struct ctdb_attach_state *state = tevent_req_data(
427                 req, struct ctdb_attach_state);
428         struct ctdb_reply_control *reply;
429         struct ctdb_req_control request;
430         bool status;
431         int ret;
432
433         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
434         TALLOC_FREE(subreq);
435         if (! status) {
436                 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH failed, ret=%d\n",
437                                   state->db->db_name, ret));
438                 tevent_req_error(req, ret);
439                 return;
440         }
441
442         ret = ctdb_reply_control_getdbpath(reply, state->db,
443                                            &state->db->db_path);
444         talloc_free(reply);
445         if (ret != 0) {
446                 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH parse failed, ret=%d\n",
447                                   state->db->db_name, ret));
448                 tevent_req_error(req, ret);
449                 return;
450         }
451
452         ctdb_req_control_db_get_health(&request, state->db->db_id);
453         subreq = ctdb_client_control_send(state, state->ev, state->client,
454                                           state->destnode, state->timeout,
455                                           &request);
456         if (tevent_req_nomem(subreq, req)) {
457                 return;
458         }
459         tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
460 }
461
462 static void ctdb_attach_health_done(struct tevent_req *subreq)
463 {
464         struct tevent_req *req = tevent_req_callback_data(
465                 subreq, struct tevent_req);
466         struct ctdb_attach_state *state = tevent_req_data(
467                 req, struct ctdb_attach_state);
468         struct ctdb_reply_control *reply;
469         const char *reason;
470         bool status;
471         int ret;
472
473         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
474         TALLOC_FREE(subreq);
475         if (! status) {
476                 DEBUG(DEBUG_ERR, ("attach: %s DB_GET_HEALTH failed, ret=%d\n",
477                                   state->db->db_name, ret));
478                 tevent_req_error(req, ret);
479                 return;
480         }
481
482         ret = ctdb_reply_control_db_get_health(reply, state, &reason);
483         if (ret != 0) {
484                 DEBUG(DEBUG_ERR,
485                       ("attach: %s DB_GET_HEALTH parse failed, ret=%d\n",
486                        state->db->db_name, ret));
487                 tevent_req_error(req, ret);
488                 return;
489         }
490
491         if (reason != NULL) {
492                 /* Database unhealthy, avoid attach */
493                 DEBUG(DEBUG_ERR, ("attach: %s database unhealthy (%s)\n",
494                                   state->db->db_name, reason));
495                 tevent_req_error(req, EIO);
496                 return;
497         }
498
499         subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
500                                         state->destnode, state->timeout,
501                                         state->db->db_id, state->db_flags);
502         if (tevent_req_nomem(subreq, req)) {
503                 return;
504         }
505         tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
506 }
507
508 static void ctdb_attach_flags_done(struct tevent_req *subreq)
509 {
510         struct tevent_req *req = tevent_req_callback_data(
511                 subreq, struct tevent_req);
512         struct ctdb_attach_state *state = tevent_req_data(
513                 req, struct ctdb_attach_state);
514         bool status;
515         int ret;
516
517         status = ctdb_set_db_flags_recv(subreq, &ret);
518         TALLOC_FREE(subreq);
519         if (! status) {
520                 DEBUG(DEBUG_ERR, ("attach: %s set db flags 0x%08x failed\n",
521                                   state->db->db_name, state->db_flags));
522                 tevent_req_error(req, ret);
523                 return;
524         }
525
526         state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
527                                         state->tdb_flags, O_RDWR, 0);
528         if (tevent_req_nomem(state->db->ltdb, req)) {
529                 DEBUG(DEBUG_ERR, ("attach: %s tdb_wrap_open failed\n",
530                                   state->db->db_name));
531                 return;
532         }
533         DLIST_ADD(state->client->db, state->db);
534
535         tevent_req_done(req);
536 }
537
538 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
539                       struct ctdb_db_context **out)
540 {
541         struct ctdb_attach_state *state = tevent_req_data(
542                 req, struct ctdb_attach_state);
543         int err;
544
545         if (tevent_req_is_unix_error(req, &err)) {
546                 if (perr != NULL) {
547                         *perr = err;
548                 }
549                 return false;
550         }
551
552         if (out != NULL) {
553                 *out = state->db;
554         }
555         return true;
556 }
557
558 int ctdb_attach(struct tevent_context *ev,
559                 struct ctdb_client_context *client,
560                 struct timeval timeout,
561                 const char *db_name, uint8_t db_flags,
562                 struct ctdb_db_context **out)
563 {
564         TALLOC_CTX *mem_ctx;
565         struct tevent_req *req;
566         bool status;
567         int ret;
568
569         mem_ctx = talloc_new(client);
570         if (mem_ctx == NULL) {
571                 return ENOMEM;
572         }
573
574         req = ctdb_attach_send(mem_ctx, ev, client, timeout,
575                                db_name, db_flags);
576         if (req == NULL) {
577                 talloc_free(mem_ctx);
578                 return ENOMEM;
579         }
580
581         tevent_req_poll(req, ev);
582
583         status = ctdb_attach_recv(req, &ret, out);
584         if (! status) {
585                 talloc_free(mem_ctx);
586                 return ret;
587         }
588
589         /*
590         ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
591         ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
592         ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
593         */
594
595         talloc_free(mem_ctx);
596         return 0;
597 }
598
599 int ctdb_detach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
600                 struct ctdb_client_context *client,
601                 struct timeval timeout, uint32_t db_id)
602 {
603         struct ctdb_db_context *db;
604         int ret;
605
606         ret = ctdb_ctrl_db_detach(mem_ctx, ev, client, client->pnn, timeout,
607                                   db_id);
608         if (ret != 0) {
609                 return ret;
610         }
611
612         for (db = client->db; db != NULL; db = db->next) {
613                 if (db->db_id == db_id) {
614                         DLIST_REMOVE(client->db, db);
615                         break;
616                 }
617         }
618
619         return 0;
620 }
621
622 uint32_t ctdb_db_id(struct ctdb_db_context *db)
623 {
624         return db->db_id;
625 }
626
627 struct ctdb_db_traverse_local_state {
628         ctdb_rec_parser_func_t parser;
629         void *private_data;
630         bool extract_header;
631         int error;
632 };
633
634 static int ctdb_db_traverse_local_handler(struct tdb_context *tdb,
635                                           TDB_DATA key, TDB_DATA data,
636                                           void *private_data)
637 {
638         struct ctdb_db_traverse_local_state *state =
639                 (struct ctdb_db_traverse_local_state *)private_data;
640         int ret;
641
642         if (state->extract_header) {
643                 struct ctdb_ltdb_header header;
644
645                 ret = ctdb_ltdb_header_extract(&data, &header);
646                 if (ret != 0) {
647                         state->error = ret;
648                         return 1;
649                 }
650
651                 ret = state->parser(0, &header, key, data, state->private_data);
652         } else {
653                 ret = state->parser(0, NULL, key, data, state->private_data);
654         }
655
656         if (ret != 0) {
657                 state->error = ret;
658                 return 1;
659         }
660
661         return 0;
662 }
663
664 int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly,
665                            bool extract_header,
666                            ctdb_rec_parser_func_t parser, void *private_data)
667 {
668         struct ctdb_db_traverse_local_state state;
669         int ret;
670
671         state.parser = parser;
672         state.private_data = private_data;
673         state.extract_header = extract_header;
674         state.error = 0;
675
676         if (readonly) {
677                 ret = tdb_traverse_read(db->ltdb->tdb,
678                                         ctdb_db_traverse_local_handler,
679                                         &state);
680         } else {
681                 ret = tdb_traverse(db->ltdb->tdb,
682                                    ctdb_db_traverse_local_handler, &state);
683         }
684
685         if (ret == -1) {
686                 return EIO;
687         }
688
689         return state.error;
690 }
691
692 int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
693                     struct ctdb_ltdb_header *header,
694                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
695 {
696         TDB_DATA rec;
697         int ret;
698
699         rec = tdb_fetch(db->ltdb->tdb, key);
700         if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
701                 /* No record present */
702                 if (rec.dptr != NULL) {
703                         free(rec.dptr);
704                 }
705
706                 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
707                         return EIO;
708                 }
709
710                 header->rsn = 0;
711                 header->dmaster = CTDB_UNKNOWN_PNN;
712                 header->flags = 0;
713
714                 if (data != NULL) {
715                         *data = tdb_null;
716                 }
717                 return 0;
718         }
719
720         ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header);
721         if (ret != 0) {
722                 return ret;
723         }
724
725         ret = 0;
726         if (data != NULL) {
727                 size_t offset = ctdb_ltdb_header_len(header);
728
729                 data->dsize = rec.dsize - offset;
730                 data->dptr = talloc_memdup(mem_ctx, rec.dptr + offset,
731                                            data->dsize);
732                 if (data->dptr == NULL) {
733                         ret = ENOMEM;
734                 }
735         }
736
737         free(rec.dptr);
738         return ret;
739 }
740
741 /*
742  * Fetch a record from volatile database
743  *
744  * Steps:
745  *  1. Get a lock on the hash chain
746  *  2. If the record does not exist, migrate the record
747  *  3. If readonly=true and delegations do not exist, migrate the record.
748  *  4. If readonly=false and delegations exist, migrate the record.
749  *  5. If the local node is not dmaster, migrate the record.
750  *  6. Return record
751  */
752
753 struct ctdb_fetch_lock_state {
754         struct tevent_context *ev;
755         struct ctdb_client_context *client;
756         struct ctdb_record_handle *h;
757         bool readonly;
758         uint32_t pnn;
759 };
760
761 static int ctdb_fetch_lock_check(struct tevent_req *req);
762 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
763 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
764
765 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
766                                         struct tevent_context *ev,
767                                         struct ctdb_client_context *client,
768                                         struct ctdb_db_context *db,
769                                         TDB_DATA key, bool readonly)
770 {
771         struct ctdb_fetch_lock_state *state;
772         struct tevent_req *req;
773         int ret;
774
775         req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
776         if (req == NULL) {
777                 return NULL;
778         }
779
780         state->ev = ev;
781         state->client = client;
782
783         state->h = talloc_zero(db, struct ctdb_record_handle);
784         if (tevent_req_nomem(state->h, req)) {
785                 return tevent_req_post(req, ev);
786         }
787         state->h->client = client;
788         state->h->db = db;
789         state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
790         if (tevent_req_nomem(state->h->key.dptr, req)) {
791                 return tevent_req_post(req, ev);
792         }
793         state->h->key.dsize = key.dsize;
794         state->h->readonly = false;
795
796         state->readonly = readonly;
797         state->pnn = ctdb_client_pnn(client);
798
799         /* Check that database is not persistent */
800         if (db->persistent) {
801                 DEBUG(DEBUG_ERR, ("fetch_lock: %s database not volatile\n",
802                                   db->db_name));
803                 tevent_req_error(req, EINVAL);
804                 return tevent_req_post(req, ev);
805         }
806
807         ret = ctdb_fetch_lock_check(req);
808         if (ret == 0) {
809                 tevent_req_done(req);
810                 return tevent_req_post(req, ev);
811         }
812         if (ret != EAGAIN) {
813                 tevent_req_error(req, ret);
814                 return tevent_req_post(req, ev);
815         }
816         return req;
817 }
818
819 static int ctdb_fetch_lock_check(struct tevent_req *req)
820 {
821         struct ctdb_fetch_lock_state *state = tevent_req_data(
822                 req, struct ctdb_fetch_lock_state);
823         struct ctdb_record_handle *h = state->h;
824         struct ctdb_ltdb_header header;
825         TDB_DATA data = tdb_null;
826         int ret, err = 0;
827         bool do_migrate = false;
828
829         ret = tdb_chainlock(h->db->ltdb->tdb, h->key);
830         if (ret != 0) {
831                 DEBUG(DEBUG_ERR,
832                       ("fetch_lock: %s tdb_chainlock failed, %s\n",
833                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
834                 err = EIO;
835                 goto failed;
836         }
837
838         data = tdb_fetch(h->db->ltdb->tdb, h->key);
839         if (data.dptr == NULL) {
840                 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
841                         goto migrate;
842                 } else {
843                         err = EIO;
844                         goto failed;
845                 }
846         }
847
848         /* Got the record */
849         ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
850         if (ret != 0) {
851                 err = ret;
852                 goto failed;
853         }
854
855         if (! state->readonly) {
856                 /* Read/write access */
857                 if (header.dmaster == state->pnn &&
858                     header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
859                         goto migrate;
860                 }
861
862                 if (header.dmaster != state->pnn) {
863                         goto migrate;
864                 }
865         } else {
866                 /* Readonly access */
867                 if (header.dmaster != state->pnn &&
868                     ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
869                                        CTDB_REC_RO_HAVE_DELEGATIONS))) {
870                         goto migrate;
871                 }
872         }
873
874         /* We are the dmaster or readonly delegation */
875         h->header = header;
876         h->data = data;
877         if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
878                             CTDB_REC_RO_HAVE_DELEGATIONS)) {
879                 h->readonly = true;
880         }
881         return 0;
882
883 migrate:
884         do_migrate = true;
885         err = EAGAIN;
886
887 failed:
888         if (data.dptr != NULL) {
889                 free(data.dptr);
890         }
891         ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
892         if (ret != 0) {
893                 DEBUG(DEBUG_ERR,
894                       ("fetch_lock: %s tdb_chainunlock failed, %s\n",
895                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
896                 return EIO;
897         }
898
899         if (do_migrate) {
900                 ctdb_fetch_lock_migrate(req);
901         }
902         return err;
903 }
904
905 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
906 {
907         struct ctdb_fetch_lock_state *state = tevent_req_data(
908                 req, struct ctdb_fetch_lock_state);
909         struct ctdb_req_call request;
910         struct tevent_req *subreq;
911
912         ZERO_STRUCT(request);
913         request.flags = CTDB_IMMEDIATE_MIGRATION;
914         if (state->readonly) {
915                 request.flags |= CTDB_WANT_READONLY;
916         }
917         request.db_id = state->h->db->db_id;
918         request.callid = CTDB_NULL_FUNC;
919         request.key = state->h->key;
920         request.calldata = tdb_null;
921
922         subreq = ctdb_client_call_send(state, state->ev, state->client,
923                                        &request);
924         if (tevent_req_nomem(subreq, req)) {
925                 return;
926         }
927
928         tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
929 }
930
931 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
932 {
933         struct tevent_req *req = tevent_req_callback_data(
934                 subreq, struct tevent_req);
935         struct ctdb_fetch_lock_state *state = tevent_req_data(
936                 req, struct ctdb_fetch_lock_state);
937         struct ctdb_reply_call *reply;
938         int ret;
939         bool status;
940
941         status = ctdb_client_call_recv(subreq, state, &reply, &ret);
942         TALLOC_FREE(subreq);
943         if (! status) {
944                 DEBUG(DEBUG_ERR, ("fetch_lock: %s CALL failed, ret=%d\n",
945                                   state->h->db->db_name, ret));
946                 tevent_req_error(req, ret);
947                 return;
948         }
949
950         if (reply->status != 0) {
951                 tevent_req_error(req, EIO);
952                 return;
953         }
954         talloc_free(reply);
955
956         ret = ctdb_fetch_lock_check(req);
957         if (ret != 0) {
958                 if (ret != EAGAIN) {
959                         tevent_req_error(req, ret);
960                 }
961                 return;
962         }
963
964         tevent_req_done(req);
965 }
966
967 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
968 {
969         int ret;
970
971         ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
972         if (ret != 0) {
973                 DEBUG(DEBUG_ERR,
974                       ("fetch_lock: %s tdb_chainunlock failed, %s\n",
975                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
976         }
977         free(h->data.dptr);
978         return 0;
979 }
980
981 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
982                                                 struct ctdb_ltdb_header *header,
983                                                 TALLOC_CTX *mem_ctx,
984                                                 TDB_DATA *data, int *perr)
985 {
986         struct ctdb_fetch_lock_state *state = tevent_req_data(
987                 req, struct ctdb_fetch_lock_state);
988         struct ctdb_record_handle *h = state->h;
989         int err;
990
991         if (tevent_req_is_unix_error(req, &err)) {
992                 if (perr != NULL) {
993                         TALLOC_FREE(state->h);
994                         *perr = err;
995                 }
996                 return NULL;
997         }
998
999         if (header != NULL) {
1000                 *header = h->header;
1001         }
1002         if (data != NULL) {
1003                 size_t offset;
1004
1005                 offset = ctdb_ltdb_header_len(&h->header);
1006
1007                 data->dsize = h->data.dsize - offset;
1008                 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
1009                                            data->dsize);
1010                 if (data->dptr == NULL) {
1011                         TALLOC_FREE(state->h);
1012                         if (perr != NULL) {
1013                                 *perr = ENOMEM;
1014                         }
1015                         return NULL;
1016                 }
1017         }
1018
1019         talloc_set_destructor(h, ctdb_record_handle_destructor);
1020         return h;
1021 }
1022
1023 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1024                     struct ctdb_client_context *client,
1025                     struct ctdb_db_context *db, TDB_DATA key, bool readonly,
1026                     struct ctdb_record_handle **out,
1027                     struct ctdb_ltdb_header *header, TDB_DATA *data)
1028 {
1029         struct tevent_req *req;
1030         struct ctdb_record_handle *h;
1031         int ret;
1032
1033         req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
1034         if (req == NULL) {
1035                 return ENOMEM;
1036         }
1037
1038         tevent_req_poll(req, ev);
1039
1040         h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
1041         if (h == NULL) {
1042                 return ret;
1043         }
1044
1045         *out = h;
1046         return 0;
1047 }
1048
1049 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
1050 {
1051         uint8_t header[sizeof(struct ctdb_ltdb_header)];
1052         TDB_DATA rec[2];
1053         int ret;
1054
1055         /* Cannot modify the record if it was obtained as a readonly copy */
1056         if (h->readonly) {
1057                 return EINVAL;
1058         }
1059
1060         /* Check if the new data is same */
1061         if (h->data.dsize == data.dsize &&
1062             memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
1063                 /* No need to do anything */
1064                 return 0;
1065         }
1066
1067         ctdb_ltdb_header_push(&h->header, header);
1068
1069         rec[0].dsize = ctdb_ltdb_header_len(&h->header);
1070         rec[0].dptr = header;
1071
1072         rec[1].dsize = data.dsize;
1073         rec[1].dptr = data.dptr;
1074
1075         ret = tdb_storev(h->db->ltdb->tdb, h->key, rec, 2, TDB_REPLACE);
1076         if (ret != 0) {
1077                 DEBUG(DEBUG_ERR,
1078                       ("store_record: %s tdb_storev failed, %s\n",
1079                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1080                 return EIO;
1081         }
1082
1083         return 0;
1084 }
1085
1086 struct ctdb_delete_record_state {
1087         struct ctdb_record_handle *h;
1088 };
1089
1090 static void ctdb_delete_record_done(struct tevent_req *subreq);
1091
1092 struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx,
1093                                            struct tevent_context *ev,
1094                                            struct ctdb_record_handle *h)
1095 {
1096         struct tevent_req *req, *subreq;
1097         struct ctdb_delete_record_state *state;
1098         struct ctdb_key_data key;
1099         struct ctdb_req_control request;
1100         uint8_t header[sizeof(struct ctdb_ltdb_header)];
1101         TDB_DATA rec;
1102         int ret;
1103
1104         req = tevent_req_create(mem_ctx, &state,
1105                                 struct ctdb_delete_record_state);
1106         if (req == NULL) {
1107                 return NULL;
1108         }
1109
1110         state->h = h;
1111
1112         /* Cannot delete the record if it was obtained as a readonly copy */
1113         if (h->readonly) {
1114                 DEBUG(DEBUG_ERR, ("fetch_lock delete: %s readonly record\n",
1115                                   h->db->db_name));
1116                 tevent_req_error(req, EINVAL);
1117                 return tevent_req_post(req, ev);
1118         }
1119
1120         ctdb_ltdb_header_push(&h->header, header);
1121
1122         rec.dsize = ctdb_ltdb_header_len(&h->header);
1123         rec.dptr = header;
1124
1125         ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1126         if (ret != 0) {
1127                 DEBUG(DEBUG_ERR,
1128                       ("fetch_lock delete: %s tdb_sore failed, %s\n",
1129                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1130                 tevent_req_error(req, EIO);
1131                 return tevent_req_post(req, ev);
1132         }
1133
1134         key.db_id = h->db->db_id;
1135         key.header = h->header;
1136         key.key = h->key;
1137
1138         ctdb_req_control_schedule_for_deletion(&request, &key);
1139         subreq = ctdb_client_control_send(state, ev, h->client,
1140                                           ctdb_client_pnn(h->client),
1141                                           tevent_timeval_zero(),
1142                                           &request);
1143         if (tevent_req_nomem(subreq, req)) {
1144                 return tevent_req_post(req, ev);
1145         }
1146         tevent_req_set_callback(subreq, ctdb_delete_record_done, req);
1147
1148         return req;
1149 }
1150
1151 static void ctdb_delete_record_done(struct tevent_req *subreq)
1152 {
1153         struct tevent_req *req = tevent_req_callback_data(
1154                 subreq, struct tevent_req);
1155         struct ctdb_delete_record_state *state = tevent_req_data(
1156                 req, struct ctdb_delete_record_state);
1157         int ret;
1158         bool status;
1159
1160         status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
1161         TALLOC_FREE(subreq);
1162         if (! status) {
1163                 DEBUG(DEBUG_ERR,
1164                       ("delete_record: %s SCHDULE_FOR_DELETION failed, "
1165                        "ret=%d\n", state->h->db->db_name, ret));
1166                 tevent_req_error(req, ret);
1167                 return;
1168         }
1169
1170         tevent_req_done(req);
1171 }
1172
1173 bool ctdb_delete_record_recv(struct tevent_req *req, int *perr)
1174 {
1175         int err;
1176
1177         if (tevent_req_is_unix_error(req, &err)) {
1178                 if (perr != NULL) {
1179                         *perr = err;
1180                 }
1181                 return false;
1182         }
1183
1184         return true;
1185 }
1186
1187
1188 int ctdb_delete_record(struct ctdb_record_handle *h)
1189 {
1190         struct tevent_context *ev = h->ev;
1191         TALLOC_CTX *mem_ctx;
1192         struct tevent_req *req;
1193         int ret;
1194         bool status;
1195
1196         mem_ctx = talloc_new(NULL);
1197         if (mem_ctx == NULL) {
1198                 return ENOMEM;
1199         }
1200
1201         req = ctdb_delete_record_send(mem_ctx, ev, h);
1202         if (req == NULL) {
1203                 talloc_free(mem_ctx);
1204                 return ENOMEM;
1205         }
1206
1207         tevent_req_poll(req, ev);
1208
1209         status = ctdb_delete_record_recv(req, &ret);
1210         talloc_free(mem_ctx);
1211         if (! status) {
1212                 return ret;
1213         }
1214
1215         return 0;
1216 }
1217
1218 /*
1219  * Global lock functions
1220  */
1221
1222 struct ctdb_g_lock_lock_state {
1223         struct tevent_context *ev;
1224         struct ctdb_client_context *client;
1225         struct ctdb_db_context *db;
1226         TDB_DATA key;
1227         struct ctdb_server_id my_sid;
1228         enum ctdb_g_lock_type lock_type;
1229         struct ctdb_record_handle *h;
1230         /* state for verification of active locks */
1231         struct ctdb_g_lock_list *lock_list;
1232         unsigned int current;
1233 };
1234
1235 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1236 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1237 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1238 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1239 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1240
1241 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1242                                   enum ctdb_g_lock_type l2)
1243 {
1244         if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1245                 return false;
1246         }
1247         return true;
1248 }
1249
1250 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1251                                          struct tevent_context *ev,
1252                                          struct ctdb_client_context *client,
1253                                          struct ctdb_db_context *db,
1254                                          const char *keyname,
1255                                          struct ctdb_server_id *sid,
1256                                          bool readonly)
1257 {
1258         struct tevent_req *req, *subreq;
1259         struct ctdb_g_lock_lock_state *state;
1260
1261         req = tevent_req_create(mem_ctx, &state,
1262                                 struct ctdb_g_lock_lock_state);
1263         if (req == NULL) {
1264                 return NULL;
1265         }
1266
1267         state->ev = ev;
1268         state->client = client;
1269         state->db = db;
1270         state->key.dptr = discard_const(keyname);
1271         state->key.dsize = strlen(keyname) + 1;
1272         state->my_sid = *sid;
1273         state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1274
1275         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1276                                       false);
1277         if (tevent_req_nomem(subreq, req)) {
1278                 return tevent_req_post(req, ev);
1279         }
1280         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1281
1282         return req;
1283 }
1284
1285 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1286 {
1287         struct tevent_req *req = tevent_req_callback_data(
1288                 subreq, struct tevent_req);
1289         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1290                 req, struct ctdb_g_lock_lock_state);
1291         TDB_DATA data;
1292         int ret = 0;
1293
1294         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1295         TALLOC_FREE(subreq);
1296         if (state->h == NULL) {
1297                 DEBUG(DEBUG_ERR, ("g_lock_lock: %s fetch lock failed\n",
1298                                   (char *)state->key.dptr));
1299                 tevent_req_error(req, ret);
1300                 return;
1301         }
1302
1303         if (state->lock_list != NULL) {
1304                 TALLOC_FREE(state->lock_list);
1305                 state->current = 0;
1306         }
1307
1308         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1309                                     &state->lock_list);
1310         talloc_free(data.dptr);
1311         if (ret != 0) {
1312                 DEBUG(DEBUG_ERR, ("g_lock_lock: %s invalid lock data\n",
1313                                   (char *)state->key.dptr));
1314                 tevent_req_error(req, ret);
1315                 return;
1316         }
1317
1318         ctdb_g_lock_lock_process_locks(req);
1319 }
1320
1321 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1322 {
1323         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1324                 req, struct ctdb_g_lock_lock_state);
1325         struct tevent_req *subreq;
1326         struct ctdb_g_lock *lock;
1327         bool check_server = false;
1328         int ret;
1329
1330         while (state->current < state->lock_list->num) {
1331                 lock = &state->lock_list->lock[state->current];
1332
1333                 /* We should not ask for the same lock more than once */
1334                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1335                         DEBUG(DEBUG_ERR, ("g_lock_lock: %s deadlock\n",
1336                                           (char *)state->key.dptr));
1337                         tevent_req_error(req, EDEADLK);
1338                         return;
1339                 }
1340
1341                 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1342                         check_server = true;
1343                         break;
1344                 }
1345
1346                 state->current += 1;
1347         }
1348
1349         if (check_server) {
1350                 struct ctdb_req_control request;
1351
1352                 ctdb_req_control_process_exists(&request, lock->sid.pid);
1353                 subreq = ctdb_client_control_send(state, state->ev,
1354                                                   state->client,
1355                                                   lock->sid.vnn,
1356                                                   tevent_timeval_zero(),
1357                                                   &request);
1358                 if (tevent_req_nomem(subreq, req)) {
1359                         return;
1360                 }
1361                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1362                 return;
1363         }
1364
1365         /* There is no conflict, add ourself to the lock_list */
1366         state->lock_list->lock = talloc_realloc(state->lock_list,
1367                                                 state->lock_list->lock,
1368                                                 struct ctdb_g_lock,
1369                                                 state->lock_list->num + 1);
1370         if (state->lock_list->lock == NULL) {
1371                 tevent_req_error(req, ENOMEM);
1372                 return;
1373         }
1374
1375         lock = &state->lock_list->lock[state->lock_list->num];
1376         lock->type = state->lock_type;
1377         lock->sid = state->my_sid;
1378         state->lock_list->num += 1;
1379
1380         ret = ctdb_g_lock_lock_update(req);
1381         if (ret != 0) {
1382                 tevent_req_error(req, ret);
1383                 return;
1384         }
1385
1386         TALLOC_FREE(state->h);
1387         tevent_req_done(req);
1388 }
1389
1390 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1391 {
1392         struct tevent_req *req = tevent_req_callback_data(
1393                 subreq, struct tevent_req);
1394         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1395                 req, struct ctdb_g_lock_lock_state);
1396         struct ctdb_reply_control *reply;
1397         int ret, value;
1398         bool status;
1399
1400         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1401         TALLOC_FREE(subreq);
1402         if (! status) {
1403                 DEBUG(DEBUG_ERR,
1404                       ("g_lock_lock: %s PROCESS_EXISTS failed, ret=%d\n",
1405                        (char *)state->key.dptr, ret));
1406                 tevent_req_error(req, ret);
1407                 return;
1408         }
1409
1410         ret = ctdb_reply_control_process_exists(reply, &value);
1411         if (ret != 0) {
1412                 tevent_req_error(req, ret);
1413                 return;
1414         }
1415         talloc_free(reply);
1416
1417         if (value == 0) {
1418                 /* server process exists, need to retry */
1419                 TALLOC_FREE(state->h);
1420                 subreq = tevent_wakeup_send(state, state->ev,
1421                                             tevent_timeval_current_ofs(0,1000));
1422                 if (tevent_req_nomem(subreq, req)) {
1423                         return;
1424                 }
1425                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1426                 return;
1427         }
1428
1429         /* server process does not exist, remove conflicting entry */
1430         state->lock_list->lock[state->current] =
1431                 state->lock_list->lock[state->lock_list->num-1];
1432         state->lock_list->num -= 1;
1433
1434         ret = ctdb_g_lock_lock_update(req);
1435         if (ret != 0) {
1436                 tevent_req_error(req, ret);
1437                 return;
1438         }
1439
1440         ctdb_g_lock_lock_process_locks(req);
1441 }
1442
1443 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1444 {
1445         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1446                 req, struct ctdb_g_lock_lock_state);
1447         TDB_DATA data;
1448         int ret;
1449
1450         data.dsize = ctdb_g_lock_list_len(state->lock_list);
1451         data.dptr = talloc_size(state, data.dsize);
1452         if (data.dptr == NULL) {
1453                 return ENOMEM;
1454         }
1455
1456         ctdb_g_lock_list_push(state->lock_list, data.dptr);
1457         ret = ctdb_store_record(state->h, data);
1458         talloc_free(data.dptr);
1459         return ret;
1460 }
1461
1462 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1463 {
1464         struct tevent_req *req = tevent_req_callback_data(
1465                 subreq, struct tevent_req);
1466         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1467                 req, struct ctdb_g_lock_lock_state);
1468         bool success;
1469
1470         success = tevent_wakeup_recv(subreq);
1471         TALLOC_FREE(subreq);
1472         if (! success) {
1473                 tevent_req_error(req, ENOMEM);
1474                 return;
1475         }
1476
1477         subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1478                                       state->db, state->key, false);
1479         if (tevent_req_nomem(subreq, req)) {
1480                 return;
1481         }
1482         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1483 }
1484
1485 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1486 {
1487         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1488                 req, struct ctdb_g_lock_lock_state);
1489         int err;
1490
1491         TALLOC_FREE(state->h);
1492
1493         if (tevent_req_is_unix_error(req, &err)) {
1494                 if (perr != NULL) {
1495                         *perr = err;
1496                 }
1497                 return false;
1498         }
1499
1500         return true;
1501 }
1502
1503 struct ctdb_g_lock_unlock_state {
1504         struct tevent_context *ev;
1505         struct ctdb_client_context *client;
1506         struct ctdb_db_context *db;
1507         TDB_DATA key;
1508         struct ctdb_server_id my_sid;
1509         struct ctdb_record_handle *h;
1510         struct ctdb_g_lock_list *lock_list;
1511 };
1512
1513 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1514 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1515 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq);
1516
1517 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1518                                            struct tevent_context *ev,
1519                                            struct ctdb_client_context *client,
1520                                            struct ctdb_db_context *db,
1521                                            const char *keyname,
1522                                            struct ctdb_server_id sid)
1523 {
1524         struct tevent_req *req, *subreq;
1525         struct ctdb_g_lock_unlock_state *state;
1526
1527         req = tevent_req_create(mem_ctx, &state,
1528                                 struct ctdb_g_lock_unlock_state);
1529         if (req == NULL) {
1530                 return NULL;
1531         }
1532
1533         state->ev = ev;
1534         state->client = client;
1535         state->db = db;
1536         state->key.dptr = discard_const(keyname);
1537         state->key.dsize = strlen(keyname) + 1;
1538         state->my_sid = sid;
1539
1540         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1541                                       false);
1542         if (tevent_req_nomem(subreq, req)) {
1543                 return tevent_req_post(req, ev);
1544         }
1545         tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1546
1547         return req;
1548 }
1549
1550 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1551 {
1552         struct tevent_req *req = tevent_req_callback_data(
1553                 subreq, struct tevent_req);
1554         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1555                 req, struct ctdb_g_lock_unlock_state);
1556         TDB_DATA data;
1557         int ret = 0;
1558
1559         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1560         TALLOC_FREE(subreq);
1561         if (state->h == NULL) {
1562                 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s fetch lock failed\n",
1563                                   (char *)state->key.dptr));
1564                 tevent_req_error(req, ret);
1565                 return;
1566         }
1567
1568         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1569                                     &state->lock_list);
1570         if (ret != 0) {
1571                 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s invalid lock data\n",
1572                                   (char *)state->key.dptr));
1573                 tevent_req_error(req, ret);
1574                 return;
1575         }
1576
1577         ret = ctdb_g_lock_unlock_update(req);
1578         if (ret != 0) {
1579                 tevent_req_error(req, ret);
1580                 return;
1581         }
1582
1583         if (state->lock_list->num == 0) {
1584                 subreq = ctdb_delete_record_send(state, state->ev, state->h);
1585                 if (tevent_req_nomem(subreq, req)) {
1586                         return;
1587                 }
1588                 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted,
1589                                         req);
1590                 return;
1591         }
1592
1593         TALLOC_FREE(state->h);
1594         tevent_req_done(req);
1595 }
1596
1597 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
1598 {
1599         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1600                 req, struct ctdb_g_lock_unlock_state);
1601         struct ctdb_g_lock *lock;
1602         int ret, i;
1603
1604         for (i=0; i<state->lock_list->num; i++) {
1605                 lock = &state->lock_list->lock[i];
1606
1607                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1608                         break;
1609                 }
1610         }
1611
1612         if (i < state->lock_list->num) {
1613                 state->lock_list->lock[i] =
1614                         state->lock_list->lock[state->lock_list->num-1];
1615                 state->lock_list->num -= 1;
1616         }
1617
1618         if (state->lock_list->num != 0) {
1619                 TDB_DATA data;
1620
1621                 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1622                 data.dptr = talloc_size(state, data.dsize);
1623                 if (data.dptr == NULL) {
1624                         return ENOMEM;
1625                 }
1626
1627                 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1628                 ret = ctdb_store_record(state->h, data);
1629                 talloc_free(data.dptr);
1630                 if (ret != 0) {
1631                         return ret;
1632                 }
1633         }
1634
1635         return 0;
1636 }
1637
1638 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq)
1639 {
1640         struct tevent_req *req = tevent_req_callback_data(
1641                 subreq, struct tevent_req);
1642         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1643                 req, struct ctdb_g_lock_unlock_state);
1644         int ret;
1645         bool status;
1646
1647         status = ctdb_delete_record_recv(subreq, &ret);
1648         if (! status) {
1649                 DEBUG(DEBUG_ERR,
1650                       ("g_lock_unlock %s delete record failed, ret=%d\n",
1651                        (char *)state->key.dptr, ret));
1652                 tevent_req_error(req, ret);
1653                 return;
1654         }
1655
1656         TALLOC_FREE(state->h);
1657         tevent_req_done(req);
1658 }
1659
1660 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
1661 {
1662         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1663                 req, struct ctdb_g_lock_unlock_state);
1664         int err;
1665
1666         TALLOC_FREE(state->h);
1667
1668         if (tevent_req_is_unix_error(req, &err)) {
1669                 if (perr != NULL) {
1670                         *perr = err;
1671                 }
1672                 return false;
1673         }
1674
1675         return true;
1676 }
1677
1678 /*
1679  * Persistent database functions
1680  */
1681 struct ctdb_transaction_start_state {
1682         struct tevent_context *ev;
1683         struct ctdb_client_context *client;
1684         struct timeval timeout;
1685         struct ctdb_transaction_handle *h;
1686         uint32_t destnode;
1687 };
1688
1689 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
1690 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
1691
1692 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
1693                                                struct tevent_context *ev,
1694                                                struct ctdb_client_context *client,
1695                                                struct timeval timeout,
1696                                                struct ctdb_db_context *db,
1697                                                bool readonly)
1698 {
1699         struct ctdb_transaction_start_state *state;
1700         struct tevent_req *req, *subreq;
1701         struct ctdb_transaction_handle *h;
1702
1703         req = tevent_req_create(mem_ctx, &state,
1704                                 struct ctdb_transaction_start_state);
1705         if (req == NULL) {
1706                 return NULL;
1707         }
1708
1709         if (! db->persistent) {
1710                 tevent_req_error(req, EINVAL);
1711                 return tevent_req_post(req, ev);
1712         }
1713
1714         state->ev = ev;
1715         state->client = client;
1716         state->destnode = ctdb_client_pnn(client);
1717
1718         h = talloc_zero(db, struct ctdb_transaction_handle);
1719         if (tevent_req_nomem(h, req)) {
1720                 return tevent_req_post(req, ev);
1721         }
1722
1723         h->ev = ev;
1724         h->client = client;
1725         h->db = db;
1726         h->readonly = readonly;
1727         h->updated = false;
1728
1729         /* SRVID is unique for databases, so client can have transactions
1730          * active for multiple databases */
1731         h->sid = ctdb_client_get_server_id(client, db->db_id);
1732
1733         h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
1734         if (tevent_req_nomem(h->recbuf, req)) {
1735                 return tevent_req_post(req, ev);
1736         }
1737
1738         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
1739         if (tevent_req_nomem(h->lock_name, req)) {
1740                 return tevent_req_post(req, ev);
1741         }
1742
1743         state->h = h;
1744
1745         subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
1746         if (tevent_req_nomem(subreq, req)) {
1747                 return tevent_req_post(req, ev);
1748         }
1749         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
1750
1751         return req;
1752 }
1753
1754 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
1755 {
1756         struct tevent_req *req = tevent_req_callback_data(
1757                 subreq, struct tevent_req);
1758         struct ctdb_transaction_start_state *state = tevent_req_data(
1759                 req, struct ctdb_transaction_start_state);
1760         bool status;
1761         int ret;
1762
1763         status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
1764         TALLOC_FREE(subreq);
1765         if (! status) {
1766                 DEBUG(DEBUG_ERR,
1767                       ("transaction_start: %s attach g_lock.tdb failed\n",
1768                        state->h->db->db_name));
1769                 tevent_req_error(req, ret);
1770                 return;
1771         }
1772
1773         subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
1774                                        state->h->db_g_lock,
1775                                        state->h->lock_name,
1776                                        &state->h->sid, state->h->readonly);
1777         if (tevent_req_nomem(subreq, req)) {
1778                 return;
1779         }
1780         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
1781 }
1782
1783 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
1784 {
1785         struct tevent_req *req = tevent_req_callback_data(
1786                 subreq, struct tevent_req);
1787         struct ctdb_transaction_start_state *state = tevent_req_data(
1788                 req, struct ctdb_transaction_start_state);
1789         int ret;
1790         bool status;
1791
1792         status = ctdb_g_lock_lock_recv(subreq, &ret);
1793         TALLOC_FREE(subreq);
1794         if (! status) {
1795                 DEBUG(DEBUG_ERR,
1796                       ("transaction_start: %s g_lock lock failed, ret=%d\n",
1797                        state->h->db->db_name, ret));
1798                 tevent_req_error(req, ret);
1799                 return;
1800         }
1801
1802         tevent_req_done(req);
1803 }
1804
1805 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
1806                                         struct tevent_req *req,
1807                                         int *perr)
1808 {
1809         struct ctdb_transaction_start_state *state = tevent_req_data(
1810                 req, struct ctdb_transaction_start_state);
1811         int err;
1812
1813         if (tevent_req_is_unix_error(req, &err)) {
1814                 if (perr != NULL) {
1815                         *perr = err;
1816                 }
1817                 return NULL;
1818         }
1819
1820         return state->h;
1821 }
1822
1823 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1824                            struct ctdb_client_context *client,
1825                            struct timeval timeout,
1826                            struct ctdb_db_context *db, bool readonly,
1827                            struct ctdb_transaction_handle **out)
1828 {
1829         struct tevent_req *req;
1830         struct ctdb_transaction_handle *h;
1831         int ret;
1832
1833         req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
1834                                           readonly);
1835         if (req == NULL) {
1836                 return ENOMEM;
1837         }
1838
1839         tevent_req_poll(req, ev);
1840
1841         h = ctdb_transaction_start_recv(req, &ret);
1842         if (h == NULL) {
1843                 return ret;
1844         }
1845
1846         *out = h;
1847         return 0;
1848 }
1849
1850 struct ctdb_transaction_record_fetch_state {
1851         TDB_DATA key, data;
1852         struct ctdb_ltdb_header header;
1853         bool found;
1854 };
1855
1856 static int ctdb_transaction_record_fetch_traverse(
1857                                 uint32_t reqid,
1858                                 struct ctdb_ltdb_header *nullheader,
1859                                 TDB_DATA key, TDB_DATA data,
1860                                 void *private_data)
1861 {
1862         struct ctdb_transaction_record_fetch_state *state =
1863                 (struct ctdb_transaction_record_fetch_state *)private_data;
1864
1865         if (state->key.dsize == key.dsize &&
1866             memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
1867                 int ret;
1868
1869                 ret = ctdb_ltdb_header_extract(&data, &state->header);
1870                 if (ret != 0) {
1871                         DEBUG(DEBUG_ERR,
1872                               ("record_fetch: Failed to extract header, "
1873                                "ret=%d\n", ret));
1874                         return 1;
1875                 }
1876
1877                 state->data = data;
1878                 state->found = true;
1879         }
1880
1881         return 0;
1882 }
1883
1884 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
1885                                          TDB_DATA key,
1886                                          struct ctdb_ltdb_header *header,
1887                                          TDB_DATA *data)
1888 {
1889         struct ctdb_transaction_record_fetch_state state;
1890         int ret;
1891
1892         state.key = key;
1893         state.found = false;
1894
1895         ret = ctdb_rec_buffer_traverse(h->recbuf,
1896                                        ctdb_transaction_record_fetch_traverse,
1897                                        &state);
1898         if (ret != 0) {
1899                 return ret;
1900         }
1901
1902         if (state.found) {
1903                 if (header != NULL) {
1904                         *header = state.header;
1905                 }
1906                 if (data != NULL) {
1907                         *data = state.data;
1908                 }
1909                 return 0;
1910         }
1911
1912         return ENOENT;
1913 }
1914
1915 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
1916                                   TDB_DATA key,
1917                                   TALLOC_CTX *mem_ctx, TDB_DATA *data)
1918 {
1919         TDB_DATA tmp_data;
1920         struct ctdb_ltdb_header header;
1921         int ret;
1922
1923         ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
1924         if (ret == 0) {
1925                 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
1926                                            tmp_data.dsize);
1927                 if (data->dptr == NULL) {
1928                         return ENOMEM;
1929                 }
1930                 data->dsize = tmp_data.dsize;
1931                 return 0;
1932         }
1933
1934         ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
1935         if (ret != 0) {
1936                 return ret;
1937         }
1938
1939         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
1940         if (ret != 0) {
1941                 return ret;
1942         }
1943
1944         return 0;
1945 }
1946
1947 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
1948                                   TDB_DATA key, TDB_DATA data)
1949 {
1950         TALLOC_CTX *tmp_ctx;
1951         struct ctdb_ltdb_header header;
1952         TDB_DATA old_data;
1953         int ret;
1954
1955         if (h->readonly) {
1956                 return EINVAL;
1957         }
1958
1959         tmp_ctx = talloc_new(h);
1960         if (tmp_ctx == NULL) {
1961                 return ENOMEM;
1962         }
1963
1964         ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
1965         if (ret != 0) {
1966                 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
1967                 if (ret != 0) {
1968                         return ret;
1969                 }
1970         }
1971
1972         if (old_data.dsize == data.dsize &&
1973             memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
1974                 talloc_free(tmp_ctx);
1975                 return 0;
1976         }
1977
1978         header.dmaster = ctdb_client_pnn(h->client);
1979         header.rsn += 1;
1980
1981         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
1982         talloc_free(tmp_ctx);
1983         if (ret != 0) {
1984                 return ret;
1985         }
1986         h->updated = true;
1987
1988         return 0;
1989 }
1990
1991 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
1992                                    TDB_DATA key)
1993 {
1994         return ctdb_transaction_store_record(h, key, tdb_null);
1995 }
1996
1997 static int ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle *h,
1998                                             uint64_t *seqnum)
1999 {
2000         const char *keyname = CTDB_DB_SEQNUM_KEY;
2001         TDB_DATA key, data;
2002         struct ctdb_ltdb_header header;
2003         int ret;
2004
2005         key.dptr = discard_const(keyname);
2006         key.dsize = strlen(keyname) + 1;
2007
2008         ret = ctdb_ltdb_fetch(h->db, key, &header, h, &data);
2009         if (ret != 0) {
2010                 DEBUG(DEBUG_ERR,
2011                       ("transaction_commit: %s seqnum fetch failed, ret=%d\n",
2012                        h->db->db_name, ret));
2013                 return ret;
2014         }
2015
2016         if (data.dsize == 0) {
2017                 /* initial data */
2018                 *seqnum = 0;
2019                 return 0;
2020         }
2021
2022         if (data.dsize != sizeof(uint64_t)) {
2023                 talloc_free(data.dptr);
2024                 return EINVAL;
2025         }
2026
2027         *seqnum = *(uint64_t *)data.dptr;
2028
2029         talloc_free(data.dptr);
2030         return 0;
2031 }
2032
2033 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
2034                                             uint64_t seqnum)
2035 {
2036         const char *keyname = CTDB_DB_SEQNUM_KEY;
2037         TDB_DATA key, data;
2038
2039         key.dptr = discard_const(keyname);
2040         key.dsize = strlen(keyname) + 1;
2041
2042         data.dptr = (uint8_t *)&seqnum;
2043         data.dsize = sizeof(seqnum);
2044
2045         return ctdb_transaction_store_record(h, key, data);
2046 }
2047
2048 struct ctdb_transaction_commit_state {
2049         struct tevent_context *ev;
2050         struct timeval timeout;
2051         struct ctdb_transaction_handle *h;
2052         uint64_t seqnum;
2053 };
2054
2055 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
2056 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq);
2057
2058 struct tevent_req *ctdb_transaction_commit_send(
2059                                         TALLOC_CTX *mem_ctx,
2060                                         struct tevent_context *ev,
2061                                         struct timeval timeout,
2062                                         struct ctdb_transaction_handle *h)
2063 {
2064         struct tevent_req *req, *subreq;
2065         struct ctdb_transaction_commit_state *state;
2066         struct ctdb_req_control request;
2067         int ret;
2068
2069         req = tevent_req_create(mem_ctx, &state,
2070                                 struct ctdb_transaction_commit_state);
2071         if (req == NULL) {
2072                 return NULL;
2073         }
2074
2075         state->ev = ev;
2076         state->timeout = timeout;
2077         state->h = h;
2078
2079         ret = ctdb_transaction_fetch_db_seqnum(h, &state->seqnum);
2080         if (ret != 0) {
2081                 tevent_req_error(req, ret);
2082                 return tevent_req_post(req, ev);
2083         }
2084
2085         ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
2086         if (ret != 0) {
2087                 tevent_req_error(req, ret);
2088                 return tevent_req_post(req, ev);
2089         }
2090
2091         ctdb_req_control_trans3_commit(&request, h->recbuf);
2092         subreq = ctdb_client_control_send(state, ev, h->client,
2093                                           ctdb_client_pnn(h->client),
2094                                           timeout, &request);
2095         if (tevent_req_nomem(subreq, req)) {
2096                 return tevent_req_post(req, ev);
2097         }
2098         tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2099
2100         return req;
2101 }
2102
2103 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2104 {
2105         struct tevent_req *req = tevent_req_callback_data(
2106                 subreq, struct tevent_req);
2107         struct ctdb_transaction_commit_state *state = tevent_req_data(
2108                 req, struct ctdb_transaction_commit_state);
2109         struct ctdb_transaction_handle *h = state->h;
2110         struct ctdb_reply_control *reply;
2111         uint64_t seqnum;
2112         int ret;
2113         bool status;
2114
2115         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2116         TALLOC_FREE(subreq);
2117         if (! status) {
2118                 DEBUG(DEBUG_ERR,
2119                       ("transaction_commit: %s TRANS3_COMMIT failed, ret=%d\n",
2120                        h->db->db_name, ret));
2121                 tevent_req_error(req, ret);
2122                 return;
2123         }
2124
2125         ret = ctdb_reply_control_trans3_commit(reply);
2126         talloc_free(reply);
2127
2128         if (ret != 0) {
2129                 /* Control failed due to recovery */
2130
2131                 ret = ctdb_transaction_fetch_db_seqnum(h, &seqnum);
2132                 if (ret != 0) {
2133                         tevent_req_error(req, ret);
2134                         return;
2135                 }
2136
2137                 if (seqnum == state->seqnum) {
2138                         struct ctdb_req_control request;
2139
2140                         /* try again */
2141                         ctdb_req_control_trans3_commit(&request,
2142                                                        state->h->recbuf);
2143                         subreq = ctdb_client_control_send(
2144                                         state, state->ev, state->h->client,
2145                                         ctdb_client_pnn(state->h->client),
2146                                         state->timeout, &request);
2147                         if (tevent_req_nomem(subreq, req)) {
2148                                 return;
2149                         }
2150                         tevent_req_set_callback(subreq,
2151                                                 ctdb_transaction_commit_done,
2152                                                 req);
2153                         return;
2154                 }
2155
2156                 if (seqnum != state->seqnum + 1) {
2157                         DEBUG(DEBUG_ERR,
2158                               ("transaction_commit: %s seqnum mismatch "
2159                                "0x%"PRIx64" != 0x%"PRIx64" + 1\n",
2160                                state->h->db->db_name, seqnum, state->seqnum));
2161                         tevent_req_error(req, EIO);
2162                         return;
2163                 }
2164         }
2165
2166         /* trans3_commit successful */
2167         subreq = ctdb_g_lock_unlock_send(state, state->ev, h->client,
2168                                          h->db_g_lock, h->lock_name, h->sid);
2169         if (tevent_req_nomem(subreq, req)) {
2170                 return;
2171         }
2172         tevent_req_set_callback(subreq, ctdb_transaction_commit_g_lock_done,
2173                                 req);
2174 }
2175
2176 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq)
2177 {
2178         struct tevent_req *req = tevent_req_callback_data(
2179                 subreq, struct tevent_req);
2180         struct ctdb_transaction_commit_state *state = tevent_req_data(
2181                 req, struct ctdb_transaction_commit_state);
2182         int ret;
2183         bool status;
2184
2185         status = ctdb_g_lock_unlock_recv(subreq, &ret);
2186         TALLOC_FREE(subreq);
2187         if (! status) {
2188                 DEBUG(DEBUG_ERR,
2189                       ("transaction_commit: %s g_lock unlock failed, ret=%d\n",
2190                        state->h->db->db_name, ret));
2191                 tevent_req_error(req, ret);
2192                 return;
2193         }
2194
2195         talloc_free(state->h);
2196         tevent_req_done(req);
2197 }
2198
2199 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2200 {
2201         int err;
2202
2203         if (tevent_req_is_unix_error(req, &err)) {
2204                 if (perr != NULL) {
2205                         *perr = err;
2206                 }
2207                 return false;
2208         }
2209
2210         return true;
2211 }
2212
2213 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2214 {
2215         struct tevent_context *ev = h->ev;
2216         TALLOC_CTX *mem_ctx;
2217         struct tevent_req *req;
2218         int ret;
2219         bool status;
2220
2221         if (h->readonly || ! h->updated) {
2222                 return ctdb_transaction_cancel(h);
2223         }
2224
2225         mem_ctx = talloc_new(NULL);
2226         if (mem_ctx == NULL) {
2227                 return ENOMEM;
2228         }
2229
2230         req = ctdb_transaction_commit_send(mem_ctx, ev,
2231                                            tevent_timeval_zero(), h);
2232         if (req == NULL) {
2233                 talloc_free(mem_ctx);
2234                 return ENOMEM;
2235         }
2236
2237         tevent_req_poll(req, ev);
2238
2239         status = ctdb_transaction_commit_recv(req, &ret);
2240         if (! status) {
2241                 talloc_free(mem_ctx);
2242                 return ret;
2243         }
2244
2245         talloc_free(mem_ctx);
2246         return 0;
2247 }
2248
2249 struct ctdb_transaction_cancel_state {
2250         struct tevent_context *ev;
2251         struct ctdb_transaction_handle *h;
2252         struct timeval timeout;
2253 };
2254
2255 static void ctdb_transaction_cancel_done(struct tevent_req *subreq);
2256
2257 struct tevent_req *ctdb_transaction_cancel_send(
2258                                         TALLOC_CTX *mem_ctx,
2259                                         struct tevent_context *ev,
2260                                         struct timeval timeout,
2261                                         struct ctdb_transaction_handle *h)
2262 {
2263         struct tevent_req *req, *subreq;
2264         struct ctdb_transaction_cancel_state *state;
2265
2266         req = tevent_req_create(mem_ctx, &state,
2267                                 struct ctdb_transaction_cancel_state);
2268         if (req == NULL) {
2269                 return NULL;
2270         }
2271
2272         state->ev = ev;
2273         state->h = h;
2274         state->timeout = timeout;
2275
2276         subreq = ctdb_g_lock_unlock_send(state, state->ev, state->h->client,
2277                                          state->h->db_g_lock,
2278                                          state->h->lock_name, state->h->sid);
2279         if (tevent_req_nomem(subreq, req)) {
2280                 return tevent_req_post(req, ev);
2281         }
2282         tevent_req_set_callback(subreq, ctdb_transaction_cancel_done,
2283                                 req);
2284
2285         return req;
2286 }
2287
2288 static void ctdb_transaction_cancel_done(struct tevent_req *subreq)
2289 {
2290         struct tevent_req *req = tevent_req_callback_data(
2291                 subreq, struct tevent_req);
2292         struct ctdb_transaction_cancel_state *state = tevent_req_data(
2293                 req, struct ctdb_transaction_cancel_state);
2294         int ret;
2295         bool status;
2296
2297         status = ctdb_g_lock_unlock_recv(subreq, &ret);
2298         TALLOC_FREE(subreq);
2299         if (! status) {
2300                 DEBUG(DEBUG_ERR,
2301                       ("transaction_cancel: %s g_lock unlock failed, ret=%d\n",
2302                        state->h->db->db_name, ret));
2303                 talloc_free(state->h);
2304                 tevent_req_error(req, ret);
2305                 return;
2306         }
2307
2308         talloc_free(state->h);
2309         tevent_req_done(req);
2310 }
2311
2312 bool ctdb_transaction_cancel_recv(struct tevent_req *req, int *perr)
2313 {
2314         int err;
2315
2316         if (tevent_req_is_unix_error(req, &err)) {
2317                 if (perr != NULL) {
2318                         *perr = err;
2319                 }
2320                 return false;
2321         }
2322
2323         return true;
2324 }
2325
2326 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2327 {
2328         struct tevent_context *ev = h->ev;
2329         struct tevent_req *req;
2330         TALLOC_CTX *mem_ctx;
2331         int ret;
2332         bool status;
2333
2334         mem_ctx = talloc_new(NULL);
2335         if (mem_ctx == NULL) {
2336                 talloc_free(h);
2337                 return ENOMEM;
2338         }
2339
2340         req = ctdb_transaction_cancel_send(mem_ctx, ev,
2341                                            tevent_timeval_zero(), h);
2342         if (req == NULL) {
2343                 talloc_free(mem_ctx);
2344                 talloc_free(h);
2345                 return ENOMEM;
2346         }
2347
2348         tevent_req_poll(req, ev);
2349
2350         status = ctdb_transaction_cancel_recv(req, &ret);
2351         if (! status) {
2352                 talloc_free(mem_ctx);
2353                 return ret;
2354         }
2355
2356         talloc_free(mem_ctx);
2357         return 0;
2358 }
2359
2360 /*
2361  * TODO:
2362  *
2363  * In future Samba should register SERVER_ID.
2364  * Make that structure same as struct srvid {}.
2365  */