ctdb-client: Remove calaculation of tdb flags
[sfrench/samba-autobuild/.git] / ctdb / client / client_db.c
1 /*
2    CTDB client code
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27
28 #include "common/logging.h"
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
39
40 static struct ctdb_db_context *client_db_handle(
41                                         struct ctdb_client_context *client,
42                                         const char *db_name)
43 {
44         struct ctdb_db_context *db;
45
46         for (db = client->db; db != NULL; db = db->next) {
47                 if (strcmp(db_name, db->db_name) == 0) {
48                         return db;
49                 }
50         }
51
52         return NULL;
53 }
54
55 struct ctdb_set_db_flags_state {
56         struct tevent_context *ev;
57         struct ctdb_client_context *client;
58         struct timeval timeout;
59         uint32_t db_id;
60         uint8_t db_flags;
61         bool readonly_done, sticky_done;
62         uint32_t *pnn_list;
63         int count;
64 };
65
66 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
67 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
68 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
69
70 static struct tevent_req *ctdb_set_db_flags_send(
71                                 TALLOC_CTX *mem_ctx,
72                                 struct tevent_context *ev,
73                                 struct ctdb_client_context *client,
74                                 uint32_t destnode, struct timeval timeout,
75                                 uint32_t db_id, uint8_t db_flags)
76 {
77         struct tevent_req *req, *subreq;
78         struct ctdb_set_db_flags_state *state;
79         struct ctdb_req_control request;
80
81         req = tevent_req_create(mem_ctx, &state,
82                                 struct ctdb_set_db_flags_state);
83         if (req == NULL) {
84                 return NULL;
85         }
86
87         if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
88                 tevent_req_done(req);
89                 return tevent_req_post(req, ev);
90         }
91
92         state->ev = ev;
93         state->client = client;
94         state->timeout = timeout;
95         state->db_id = db_id;
96         state->db_flags = db_flags;
97
98         ctdb_req_control_get_nodemap(&request);
99         subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
100                                           &request);
101         if (tevent_req_nomem(subreq, req)) {
102                 return tevent_req_post(req, ev);
103         }
104         tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
105
106         return req;
107 }
108
109 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
110 {
111         struct tevent_req *req = tevent_req_callback_data(
112                 subreq, struct tevent_req);
113         struct ctdb_set_db_flags_state *state = tevent_req_data(
114                 req, struct ctdb_set_db_flags_state);
115         struct ctdb_req_control request;
116         struct ctdb_reply_control *reply;
117         struct ctdb_node_map *nodemap;
118         int ret;
119         bool status;
120
121         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
122         TALLOC_FREE(subreq);
123         if (! status) {
124                 DEBUG(DEBUG_ERR,
125                       ("set_db_flags: 0x%08x GET_NODEMAP failed, ret=%d\n",
126                        state->db_id, ret));
127                 tevent_req_error(req, ret);
128                 return;
129         }
130
131         ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
132         talloc_free(reply);
133         if (ret != 0) {
134                 DEBUG(DEBUG_ERR,
135                       ("set_db_flags: 0x%08x GET_NODEMAP parse failed, ret=%d\n",
136                       state->db_id, ret));
137                 tevent_req_error(req, ret);
138                 return;
139         }
140
141         state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
142                                                state, &state->pnn_list);
143         talloc_free(nodemap);
144         if (state->count <= 0) {
145                 DEBUG(DEBUG_ERR,
146                       ("set_db_flags: 0x%08x no connected nodes, count=%d\n",
147                        state->db_id, state->count));
148                 tevent_req_error(req, ENOMEM);
149                 return;
150         }
151
152         if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
153                 ctdb_req_control_set_db_readonly(&request, state->db_id);
154                 subreq = ctdb_client_control_multi_send(
155                                         state, state->ev, state->client,
156                                         state->pnn_list, state->count,
157                                         state->timeout, &request);
158                 if (tevent_req_nomem(subreq, req)) {
159                         return;
160                 }
161                 tevent_req_set_callback(subreq,
162                                         ctdb_set_db_flags_readonly_done, req);
163         } else {
164                 state->readonly_done = true;
165         }
166
167         if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
168                 ctdb_req_control_set_db_sticky(&request, state->db_id);
169                 subreq = ctdb_client_control_multi_send(
170                                         state, state->ev, state->client,
171                                         state->pnn_list, state->count,
172                                         state->timeout, &request);
173                 if (tevent_req_nomem(subreq, req)) {
174                         return;
175                 }
176                 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
177                                         req);
178         } else {
179                 state->sticky_done = true;
180         }
181 }
182
183 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
184 {
185         struct tevent_req *req = tevent_req_callback_data(
186                 subreq, struct tevent_req);
187         struct ctdb_set_db_flags_state *state = tevent_req_data(
188                 req, struct ctdb_set_db_flags_state);
189         int ret;
190         bool status;
191
192         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
193                                                 NULL);
194         TALLOC_FREE(subreq);
195         if (! status) {
196                 DEBUG(DEBUG_ERR,
197                       ("set_db_flags: 0x%08x SET_DB_READONLY failed, ret=%d\n",
198                        state->db_id, ret));
199                 tevent_req_error(req, ret);
200                 return;
201         }
202
203         state->readonly_done = true;
204
205         if (state->readonly_done && state->sticky_done) {
206                 tevent_req_done(req);
207         }
208 }
209
210 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
211 {
212         struct tevent_req *req = tevent_req_callback_data(
213                 subreq, struct tevent_req);
214         struct ctdb_set_db_flags_state *state = tevent_req_data(
215                 req, struct ctdb_set_db_flags_state);
216         int ret;
217         bool status;
218
219         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
220                                                 NULL);
221         TALLOC_FREE(subreq);
222         if (! status) {
223                 DEBUG(DEBUG_ERR,
224                       ("set_db_flags: 0x%08x SET_DB_STICKY failed, ret=%d\n",
225                        state->db_id, ret));
226                 tevent_req_error(req, ret);
227                 return;
228         }
229
230         state->sticky_done = true;
231
232         if (state->readonly_done && state->sticky_done) {
233                 tevent_req_done(req);
234         }
235 }
236
237 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
238 {
239         int err;
240
241         if (tevent_req_is_unix_error(req, &err)) {
242                 if (perr != NULL) {
243                         *perr = err;
244                 }
245                 return false;
246         }
247         return true;
248 }
249
250 struct ctdb_attach_state {
251         struct tevent_context *ev;
252         struct ctdb_client_context *client;
253         struct timeval timeout;
254         uint32_t destnode;
255         uint8_t db_flags;
256         struct ctdb_db_context *db;
257 };
258
259 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
260 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
261 static void ctdb_attach_health_done(struct tevent_req *subreq);
262 static void ctdb_attach_flags_done(struct tevent_req *subreq);
263 static void ctdb_attach_open_flags_done(struct tevent_req *subreq);
264
265 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
266                                     struct tevent_context *ev,
267                                     struct ctdb_client_context *client,
268                                     struct timeval timeout,
269                                     const char *db_name, uint8_t db_flags)
270 {
271         struct tevent_req *req, *subreq;
272         struct ctdb_attach_state *state;
273         struct ctdb_req_control request;
274
275         req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
276         if (req == NULL) {
277                 return NULL;
278         }
279
280         state->db = client_db_handle(client, db_name);
281         if (state->db != NULL) {
282                 tevent_req_done(req);
283                 return tevent_req_post(req, ev);
284         }
285
286         state->ev = ev;
287         state->client = client;
288         state->timeout = timeout;
289         state->destnode = ctdb_client_pnn(client);
290         state->db_flags = db_flags;
291
292         state->db = talloc_zero(client, struct ctdb_db_context);
293         if (tevent_req_nomem(state->db, req)) {
294                 return tevent_req_post(req, ev);
295         }
296
297         state->db->db_name = talloc_strdup(state->db, db_name);
298         if (tevent_req_nomem(state->db, req)) {
299                 return tevent_req_post(req, ev);
300         }
301
302         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
303                 state->db->persistent = true;
304         }
305
306         if (state->db->persistent) {
307                 ctdb_req_control_db_attach_persistent(&request,
308                                                       state->db->db_name, 0);
309         } else {
310                 ctdb_req_control_db_attach(&request, state->db->db_name, 0);
311         }
312
313         subreq = ctdb_client_control_send(state, state->ev, state->client,
314                                           state->destnode, state->timeout,
315                                           &request);
316         if (tevent_req_nomem(subreq, req)) {
317                 return tevent_req_post(req, ev);
318         }
319         tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
320
321         return req;
322 }
323
324 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
325 {
326         struct tevent_req *req = tevent_req_callback_data(
327                 subreq, struct tevent_req);
328         struct ctdb_attach_state *state = tevent_req_data(
329                 req, struct ctdb_attach_state);
330         struct ctdb_req_control request;
331         struct ctdb_reply_control *reply;
332         bool status;
333         int ret;
334
335         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
336         TALLOC_FREE(subreq);
337         if (! status) {
338                 DEBUG(DEBUG_ERR, ("attach: %s %s failed, ret=%d\n",
339                                   state->db->db_name,
340                                   (state->db->persistent
341                                         ? "DB_ATTACH_PERSISTENT"
342                                         : "DB_ATTACH"),
343                                   ret));
344                 tevent_req_error(req, ret);
345                 return;
346         }
347
348         if (state->db->persistent) {
349                 ret = ctdb_reply_control_db_attach_persistent(
350                                 reply, &state->db->db_id);
351         } else {
352                 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
353         }
354         talloc_free(reply);
355         if (ret != 0) {
356                 DEBUG(DEBUG_ERR, ("attach: %s failed to get db_id, ret=%d\n",
357                                   state->db->db_name, ret));
358                 tevent_req_error(req, ret);
359                 return;
360         }
361
362         ctdb_req_control_getdbpath(&request, state->db->db_id);
363         subreq = ctdb_client_control_send(state, state->ev, state->client,
364                                           state->destnode, state->timeout,
365                                           &request);
366         if (tevent_req_nomem(subreq, req)) {
367                 return;
368         }
369         tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
370 }
371
372 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
373 {
374         struct tevent_req *req = tevent_req_callback_data(
375                 subreq, struct tevent_req);
376         struct ctdb_attach_state *state = tevent_req_data(
377                 req, struct ctdb_attach_state);
378         struct ctdb_reply_control *reply;
379         struct ctdb_req_control request;
380         bool status;
381         int ret;
382
383         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
384         TALLOC_FREE(subreq);
385         if (! status) {
386                 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH failed, ret=%d\n",
387                                   state->db->db_name, ret));
388                 tevent_req_error(req, ret);
389                 return;
390         }
391
392         ret = ctdb_reply_control_getdbpath(reply, state->db,
393                                            &state->db->db_path);
394         talloc_free(reply);
395         if (ret != 0) {
396                 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH parse failed, ret=%d\n",
397                                   state->db->db_name, ret));
398                 tevent_req_error(req, ret);
399                 return;
400         }
401
402         ctdb_req_control_db_get_health(&request, state->db->db_id);
403         subreq = ctdb_client_control_send(state, state->ev, state->client,
404                                           state->destnode, state->timeout,
405                                           &request);
406         if (tevent_req_nomem(subreq, req)) {
407                 return;
408         }
409         tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
410 }
411
412 static void ctdb_attach_health_done(struct tevent_req *subreq)
413 {
414         struct tevent_req *req = tevent_req_callback_data(
415                 subreq, struct tevent_req);
416         struct ctdb_attach_state *state = tevent_req_data(
417                 req, struct ctdb_attach_state);
418         struct ctdb_reply_control *reply;
419         const char *reason;
420         bool status;
421         int ret;
422
423         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
424         TALLOC_FREE(subreq);
425         if (! status) {
426                 DEBUG(DEBUG_ERR, ("attach: %s DB_GET_HEALTH failed, ret=%d\n",
427                                   state->db->db_name, ret));
428                 tevent_req_error(req, ret);
429                 return;
430         }
431
432         ret = ctdb_reply_control_db_get_health(reply, state, &reason);
433         if (ret != 0) {
434                 DEBUG(DEBUG_ERR,
435                       ("attach: %s DB_GET_HEALTH parse failed, ret=%d\n",
436                        state->db->db_name, ret));
437                 tevent_req_error(req, ret);
438                 return;
439         }
440
441         if (reason != NULL) {
442                 /* Database unhealthy, avoid attach */
443                 DEBUG(DEBUG_ERR, ("attach: %s database unhealthy (%s)\n",
444                                   state->db->db_name, reason));
445                 tevent_req_error(req, EIO);
446                 return;
447         }
448
449         subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
450                                         state->destnode, state->timeout,
451                                         state->db->db_id, state->db_flags);
452         if (tevent_req_nomem(subreq, req)) {
453                 return;
454         }
455         tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
456 }
457
458 static void ctdb_attach_flags_done(struct tevent_req *subreq)
459 {
460         struct tevent_req *req = tevent_req_callback_data(
461                 subreq, struct tevent_req);
462         struct ctdb_attach_state *state = tevent_req_data(
463                 req, struct ctdb_attach_state);
464         struct ctdb_req_control request;
465         bool status;
466         int ret;
467
468         status = ctdb_set_db_flags_recv(subreq, &ret);
469         TALLOC_FREE(subreq);
470         if (! status) {
471                 DEBUG(DEBUG_ERR, ("attach: %s set db flags 0x%08x failed\n",
472                                   state->db->db_name, state->db_flags));
473                 tevent_req_error(req, ret);
474                 return;
475         }
476
477         ctdb_req_control_db_open_flags(&request, state->db->db_id);
478         subreq = ctdb_client_control_send(state, state->ev, state->client,
479                                           state->destnode, state->timeout,
480                                           &request);
481         if (tevent_req_nomem(subreq, req)) {
482                 return;
483         }
484         tevent_req_set_callback(subreq, ctdb_attach_open_flags_done, req);
485 }
486
487 static void ctdb_attach_open_flags_done(struct tevent_req *subreq)
488 {
489         struct tevent_req *req = tevent_req_callback_data(
490                 subreq, struct tevent_req);
491         struct ctdb_attach_state *state = tevent_req_data(
492                 req, struct ctdb_attach_state);
493         struct ctdb_reply_control *reply;
494         bool status;
495         int ret, tdb_flags;
496
497         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
498         TALLOC_FREE(subreq);
499         if (! status) {
500                 DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS failed, ret=%d\n",
501                                   state->db->db_name, ret));
502                 tevent_req_error(req, ret);
503                 return;
504         }
505
506         ret = ctdb_reply_control_db_open_flags(reply, &tdb_flags);
507         talloc_free(reply);
508         if (ret != 0) {
509                 DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS parse failed,"
510                                   " ret=%d\n", state->db->db_name, ret));
511                 tevent_req_error(req, ret);
512                 return;
513         }
514
515         state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
516                                         tdb_flags, O_RDWR, 0);
517         if (tevent_req_nomem(state->db->ltdb, req)) {
518                 DEBUG(DEBUG_ERR, ("attach: %s tdb_wrap_open failed\n",
519                                   state->db->db_name));
520                 return;
521         }
522         DLIST_ADD(state->client->db, state->db);
523
524         tevent_req_done(req);
525 }
526
527 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
528                       struct ctdb_db_context **out)
529 {
530         struct ctdb_attach_state *state = tevent_req_data(
531                 req, struct ctdb_attach_state);
532         int err;
533
534         if (tevent_req_is_unix_error(req, &err)) {
535                 if (perr != NULL) {
536                         *perr = err;
537                 }
538                 return false;
539         }
540
541         if (out != NULL) {
542                 *out = state->db;
543         }
544         return true;
545 }
546
547 int ctdb_attach(struct tevent_context *ev,
548                 struct ctdb_client_context *client,
549                 struct timeval timeout,
550                 const char *db_name, uint8_t db_flags,
551                 struct ctdb_db_context **out)
552 {
553         TALLOC_CTX *mem_ctx;
554         struct tevent_req *req;
555         bool status;
556         int ret;
557
558         mem_ctx = talloc_new(client);
559         if (mem_ctx == NULL) {
560                 return ENOMEM;
561         }
562
563         req = ctdb_attach_send(mem_ctx, ev, client, timeout,
564                                db_name, db_flags);
565         if (req == NULL) {
566                 talloc_free(mem_ctx);
567                 return ENOMEM;
568         }
569
570         tevent_req_poll(req, ev);
571
572         status = ctdb_attach_recv(req, &ret, out);
573         if (! status) {
574                 talloc_free(mem_ctx);
575                 return ret;
576         }
577
578         /*
579         ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
580         ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
581         ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
582         */
583
584         talloc_free(mem_ctx);
585         return 0;
586 }
587
588 struct ctdb_detach_state {
589         struct ctdb_client_context *client;
590         struct tevent_context *ev;
591         struct timeval timeout;
592         uint32_t db_id;
593         const char *db_name;
594 };
595
596 static void ctdb_detach_dbname_done(struct tevent_req *subreq);
597 static void ctdb_detach_done(struct tevent_req *subreq);
598
599 struct tevent_req *ctdb_detach_send(TALLOC_CTX *mem_ctx,
600                                     struct tevent_context *ev,
601                                     struct ctdb_client_context *client,
602                                     struct timeval timeout, uint32_t db_id)
603 {
604         struct tevent_req *req, *subreq;
605         struct ctdb_detach_state *state;
606         struct ctdb_req_control request;
607
608         req = tevent_req_create(mem_ctx, &state, struct ctdb_detach_state);
609         if (req == NULL) {
610                 return NULL;
611         }
612
613         state->client = client;
614         state->ev = ev;
615         state->timeout = timeout;
616         state->db_id = db_id;
617
618         ctdb_req_control_get_dbname(&request, db_id);
619         subreq = ctdb_client_control_send(state, ev, client,
620                                           ctdb_client_pnn(client), timeout,
621                                           &request);
622         if (tevent_req_nomem(subreq, req)) {
623                 return tevent_req_post(req, ev);
624         }
625         tevent_req_set_callback(subreq, ctdb_detach_dbname_done, req);
626
627         return req;
628 }
629
630 static void ctdb_detach_dbname_done(struct tevent_req *subreq)
631 {
632         struct tevent_req *req = tevent_req_callback_data(
633                 subreq, struct tevent_req);
634         struct ctdb_detach_state *state = tevent_req_data(
635                 req, struct ctdb_detach_state);
636         struct ctdb_reply_control *reply;
637         struct ctdb_req_control request;
638         int ret;
639         bool status;
640
641         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
642         TALLOC_FREE(subreq);
643         if (! status) {
644                 DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
645                                   state->db_id, ret));
646                 tevent_req_error(req, ret);
647                 return;
648         }
649
650         ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
651         if (ret != 0) {
652                 DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
653                                   state->db_id, ret));
654                 tevent_req_error(req, ret);
655                 return;
656         }
657
658         ctdb_req_control_db_detach(&request, state->db_id);
659         subreq = ctdb_client_control_send(state, state->ev, state->client,
660                                           ctdb_client_pnn(state->client),
661                                           state->timeout, &request);
662         if (tevent_req_nomem(subreq, req)) {
663                 return;
664         }
665         tevent_req_set_callback(subreq, ctdb_detach_done, req);
666
667 }
668
669 static void ctdb_detach_done(struct tevent_req *subreq)
670 {
671         struct tevent_req *req = tevent_req_callback_data(
672                 subreq, struct tevent_req);
673         struct ctdb_detach_state *state = tevent_req_data(
674                 req, struct ctdb_detach_state);
675         struct ctdb_reply_control *reply;
676         struct ctdb_db_context *db;
677         int ret;
678         bool status;
679
680         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
681         TALLOC_FREE(subreq);
682         if (! status) {
683                 DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
684                                   state->db_name, ret));
685                 tevent_req_error(req, ret);
686                 return;
687         }
688
689         ret = ctdb_reply_control_db_detach(reply);
690         if (ret != 0) {
691                 DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
692                                   state->db_name, ret));
693                 tevent_req_error(req, ret);
694                 return;
695         }
696
697         db = client_db_handle(state->client, state->db_name);
698         if (db != NULL) {
699                 DLIST_REMOVE(state->client->db, db);
700                 TALLOC_FREE(db);
701         }
702
703         tevent_req_done(req);
704 }
705
706 bool ctdb_detach_recv(struct tevent_req *req, int *perr)
707 {
708         int ret;
709
710         if (tevent_req_is_unix_error(req, &ret)) {
711                 if (perr != NULL) {
712                         *perr = ret;
713                 }
714                 return false;
715         }
716
717         return true;
718 }
719
720 int ctdb_detach(struct tevent_context *ev,
721                 struct ctdb_client_context *client,
722                 struct timeval timeout, uint32_t db_id)
723 {
724         TALLOC_CTX *mem_ctx;
725         struct tevent_req *req;
726         int ret;
727         bool status;
728
729         mem_ctx = talloc_new(client);
730         if (mem_ctx == NULL) {
731                 return ENOMEM;
732         }
733
734         req = ctdb_detach_send(mem_ctx, ev, client, timeout, db_id);
735         if (req == NULL) {
736                 talloc_free(mem_ctx);
737                 return ENOMEM;
738         }
739
740         tevent_req_poll(req, ev);
741
742         status = ctdb_detach_recv(req, &ret);
743         if (! status) {
744                 talloc_free(mem_ctx);
745                 return ret;
746         }
747
748         talloc_free(mem_ctx);
749         return 0;
750 }
751
752 uint32_t ctdb_db_id(struct ctdb_db_context *db)
753 {
754         return db->db_id;
755 }
756
757 struct ctdb_db_traverse_local_state {
758         ctdb_rec_parser_func_t parser;
759         void *private_data;
760         bool extract_header;
761         int error;
762 };
763
764 static int ctdb_db_traverse_local_handler(struct tdb_context *tdb,
765                                           TDB_DATA key, TDB_DATA data,
766                                           void *private_data)
767 {
768         struct ctdb_db_traverse_local_state *state =
769                 (struct ctdb_db_traverse_local_state *)private_data;
770         int ret;
771
772         if (state->extract_header) {
773                 struct ctdb_ltdb_header header;
774
775                 ret = ctdb_ltdb_header_extract(&data, &header);
776                 if (ret != 0) {
777                         state->error = ret;
778                         return 1;
779                 }
780
781                 ret = state->parser(0, &header, key, data, state->private_data);
782         } else {
783                 ret = state->parser(0, NULL, key, data, state->private_data);
784         }
785
786         if (ret != 0) {
787                 state->error = ret;
788                 return 1;
789         }
790
791         return 0;
792 }
793
794 int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly,
795                            bool extract_header,
796                            ctdb_rec_parser_func_t parser, void *private_data)
797 {
798         struct ctdb_db_traverse_local_state state;
799         int ret;
800
801         state.parser = parser;
802         state.private_data = private_data;
803         state.extract_header = extract_header;
804         state.error = 0;
805
806         if (readonly) {
807                 ret = tdb_traverse_read(db->ltdb->tdb,
808                                         ctdb_db_traverse_local_handler,
809                                         &state);
810         } else {
811                 ret = tdb_traverse(db->ltdb->tdb,
812                                    ctdb_db_traverse_local_handler, &state);
813         }
814
815         if (ret == -1) {
816                 return EIO;
817         }
818
819         return state.error;
820 }
821
822 struct ctdb_db_traverse_state {
823         struct tevent_context *ev;
824         struct ctdb_client_context *client;
825         struct ctdb_db_context *db;
826         uint32_t destnode;
827         uint64_t srvid;
828         struct timeval timeout;
829         ctdb_rec_parser_func_t parser;
830         void *private_data;
831         int result;
832 };
833
834 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq);
835 static void ctdb_db_traverse_started(struct tevent_req *subreq);
836 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
837                                      void *private_data);
838 static void ctdb_db_traverse_remove_handler(struct tevent_req *req);
839 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq);
840
841 struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx,
842                                          struct tevent_context *ev,
843                                          struct ctdb_client_context *client,
844                                          struct ctdb_db_context *db,
845                                          uint32_t destnode,
846                                          struct timeval timeout,
847                                          ctdb_rec_parser_func_t parser,
848                                          void *private_data)
849 {
850         struct tevent_req *req, *subreq;
851         struct ctdb_db_traverse_state *state;
852
853         req = tevent_req_create(mem_ctx, &state,
854                                 struct ctdb_db_traverse_state);
855         if (req == NULL) {
856                 return NULL;
857         }
858
859         state->ev = ev;
860         state->client = client;
861         state->db = db;
862         state->destnode = destnode;
863         state->srvid = CTDB_SRVID_CLIENT_RANGE | getpid();
864         state->timeout = timeout;
865         state->parser = parser;
866         state->private_data = private_data;
867
868         subreq = ctdb_client_set_message_handler_send(state, ev, client,
869                                                       state->srvid,
870                                                       ctdb_db_traverse_handler,
871                                                       req);
872         if (tevent_req_nomem(subreq, req)) {
873                 return tevent_req_post(req, ev);
874         }
875         tevent_req_set_callback(subreq, ctdb_db_traverse_handler_set, req);
876
877         return req;
878 }
879
880 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq)
881 {
882         struct tevent_req *req = tevent_req_callback_data(
883                 subreq, struct tevent_req);
884         struct ctdb_db_traverse_state *state = tevent_req_data(
885                 req, struct ctdb_db_traverse_state);
886         struct ctdb_traverse_start_ext traverse;
887         struct ctdb_req_control request;
888         int ret = 0;
889         bool status;
890
891         status = ctdb_client_set_message_handler_recv(subreq, &ret);
892         TALLOC_FREE(subreq);
893         if (! status) {
894                 tevent_req_error(req, ret);
895                 return;
896         }
897
898         traverse = (struct ctdb_traverse_start_ext) {
899                 .db_id = ctdb_db_id(state->db),
900                 .reqid = 0,
901                 .srvid = state->srvid,
902                 .withemptyrecords = false,
903         };
904
905         ctdb_req_control_traverse_start_ext(&request, &traverse);
906         subreq = ctdb_client_control_send(state, state->ev, state->client,
907                                           state->destnode, state->timeout,
908                                           &request);
909         if (subreq == NULL) {
910                 state->result = ENOMEM;
911                 ctdb_db_traverse_remove_handler(req);
912                 return;
913         }
914         tevent_req_set_callback(subreq, ctdb_db_traverse_started, req);
915 }
916
917 static void ctdb_db_traverse_started(struct tevent_req *subreq)
918 {
919         struct tevent_req *req = tevent_req_callback_data(
920                 subreq, struct tevent_req);
921         struct ctdb_db_traverse_state *state = tevent_req_data(
922                 req, struct ctdb_db_traverse_state);
923         struct ctdb_reply_control *reply;
924         int ret = 0;
925         bool status;
926
927         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
928         TALLOC_FREE(subreq);
929         if (! status) {
930                 DEBUG(DEBUG_ERR, ("traverse: control failed, ret=%d\n", ret));
931                 state->result = ret;
932                 ctdb_db_traverse_remove_handler(req);
933                 return;
934         }
935
936         ret = ctdb_reply_control_traverse_start_ext(reply);
937         talloc_free(reply);
938         if (ret != 0) {
939                 DEBUG(DEBUG_ERR, ("traverse: control reply failed, ret=%d\n",
940                                   ret));
941                 state->result = ret;
942                 ctdb_db_traverse_remove_handler(req);
943                 return;
944         }
945 }
946
947 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
948                                      void *private_data)
949 {
950         struct tevent_req *req = talloc_get_type_abort(
951                 private_data, struct tevent_req);
952         struct ctdb_db_traverse_state *state = tevent_req_data(
953                 req, struct ctdb_db_traverse_state);
954         struct ctdb_rec_data *rec;
955         struct ctdb_ltdb_header header;
956         int ret;
957
958         ret = ctdb_rec_data_pull(data.dptr, data.dsize, state, &rec);
959         if (ret != 0) {
960                 return;
961         }
962
963         if (rec->key.dsize == 0 && rec->data.dsize == 0) {
964                 talloc_free(rec);
965                 ctdb_db_traverse_remove_handler(req);
966                 return;
967         }
968
969         ret = ctdb_ltdb_header_extract(&rec->data, &header);
970         if (ret != 0) {
971                 talloc_free(rec);
972                 return;
973         }
974
975         if (rec->data.dsize == 0) {
976                 talloc_free(rec);
977                 return;
978         }
979
980         ret = state->parser(rec->reqid, &header, rec->key, rec->data,
981                             state->private_data);
982         talloc_free(rec);
983         if (ret != 0) {
984                 state->result = ret;
985                 ctdb_db_traverse_remove_handler(req);
986         }
987 }
988
989 static void ctdb_db_traverse_remove_handler(struct tevent_req *req)
990 {
991         struct ctdb_db_traverse_state *state = tevent_req_data(
992                 req, struct ctdb_db_traverse_state);
993         struct tevent_req *subreq;
994
995         subreq = ctdb_client_remove_message_handler_send(state, state->ev,
996                                                          state->client,
997                                                          state->srvid, req);
998         if (tevent_req_nomem(subreq, req)) {
999                 return;
1000         }
1001         tevent_req_set_callback(subreq, ctdb_db_traverse_handler_removed, req);
1002 }
1003
1004 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq)
1005 {
1006         struct tevent_req *req = tevent_req_callback_data(
1007                 subreq, struct tevent_req);
1008         struct ctdb_db_traverse_state *state = tevent_req_data(
1009                 req, struct ctdb_db_traverse_state);
1010         int ret;
1011         bool status;
1012
1013         status = ctdb_client_remove_message_handler_recv(subreq, &ret);
1014         TALLOC_FREE(subreq);
1015         if (! status) {
1016                 tevent_req_error(req, ret);
1017                 return;
1018         }
1019
1020         if (state->result != 0) {
1021                 tevent_req_error(req, state->result);
1022                 return;
1023         }
1024
1025         tevent_req_done(req);
1026 }
1027
1028 bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr)
1029 {
1030         int ret;
1031
1032         if (tevent_req_is_unix_error(req, &ret)) {
1033                 if (perr != NULL) {
1034                         *perr = ret;
1035                 }
1036                 return false;
1037         }
1038
1039         return true;
1040 }
1041
1042 int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1043                      struct ctdb_client_context *client,
1044                      struct ctdb_db_context *db,
1045                      uint32_t destnode, struct timeval timeout,
1046                      ctdb_rec_parser_func_t parser, void *private_data)
1047 {
1048         struct tevent_req *req;
1049         int ret = 0;
1050         bool status;
1051
1052         req = ctdb_db_traverse_send(mem_ctx, ev, client, db, destnode,
1053                                     timeout, parser, private_data);
1054         if (req == NULL) {
1055                 return ENOMEM;
1056         }
1057
1058         tevent_req_poll(req, ev);
1059
1060         status = ctdb_db_traverse_recv(req, &ret);
1061         if (! status) {
1062                 return ret;
1063         }
1064
1065         return 0;
1066 }
1067
1068 int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
1069                     struct ctdb_ltdb_header *header,
1070                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
1071 {
1072         TDB_DATA rec;
1073         int ret;
1074
1075         rec = tdb_fetch(db->ltdb->tdb, key);
1076         if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
1077                 /* No record present */
1078                 if (rec.dptr != NULL) {
1079                         free(rec.dptr);
1080                 }
1081
1082                 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
1083                         return EIO;
1084                 }
1085
1086                 header->rsn = 0;
1087                 header->dmaster = CTDB_UNKNOWN_PNN;
1088                 header->flags = 0;
1089
1090                 if (data != NULL) {
1091                         *data = tdb_null;
1092                 }
1093                 return 0;
1094         }
1095
1096         ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header);
1097         if (ret != 0) {
1098                 return ret;
1099         }
1100
1101         ret = 0;
1102         if (data != NULL) {
1103                 size_t offset = ctdb_ltdb_header_len(header);
1104
1105                 data->dsize = rec.dsize - offset;
1106                 data->dptr = talloc_memdup(mem_ctx, rec.dptr + offset,
1107                                            data->dsize);
1108                 if (data->dptr == NULL) {
1109                         ret = ENOMEM;
1110                 }
1111         }
1112
1113         free(rec.dptr);
1114         return ret;
1115 }
1116
1117 /*
1118  * Fetch a record from volatile database
1119  *
1120  * Steps:
1121  *  1. Get a lock on the hash chain
1122  *  2. If the record does not exist, migrate the record
1123  *  3. If readonly=true and delegations do not exist, migrate the record.
1124  *  4. If readonly=false and delegations exist, migrate the record.
1125  *  5. If the local node is not dmaster, migrate the record.
1126  *  6. Return record
1127  */
1128
1129 struct ctdb_fetch_lock_state {
1130         struct tevent_context *ev;
1131         struct ctdb_client_context *client;
1132         struct ctdb_record_handle *h;
1133         bool readonly;
1134         uint32_t pnn;
1135 };
1136
1137 static int ctdb_fetch_lock_check(struct tevent_req *req);
1138 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
1139 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
1140
1141 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
1142                                         struct tevent_context *ev,
1143                                         struct ctdb_client_context *client,
1144                                         struct ctdb_db_context *db,
1145                                         TDB_DATA key, bool readonly)
1146 {
1147         struct ctdb_fetch_lock_state *state;
1148         struct tevent_req *req;
1149         int ret;
1150
1151         req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
1152         if (req == NULL) {
1153                 return NULL;
1154         }
1155
1156         state->ev = ev;
1157         state->client = client;
1158
1159         state->h = talloc_zero(db, struct ctdb_record_handle);
1160         if (tevent_req_nomem(state->h, req)) {
1161                 return tevent_req_post(req, ev);
1162         }
1163         state->h->client = client;
1164         state->h->db = db;
1165         state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
1166         if (tevent_req_nomem(state->h->key.dptr, req)) {
1167                 return tevent_req_post(req, ev);
1168         }
1169         state->h->key.dsize = key.dsize;
1170         state->h->readonly = false;
1171
1172         state->readonly = readonly;
1173         state->pnn = ctdb_client_pnn(client);
1174
1175         /* Check that database is not persistent */
1176         if (db->persistent) {
1177                 DEBUG(DEBUG_ERR, ("fetch_lock: %s database not volatile\n",
1178                                   db->db_name));
1179                 tevent_req_error(req, EINVAL);
1180                 return tevent_req_post(req, ev);
1181         }
1182
1183         ret = ctdb_fetch_lock_check(req);
1184         if (ret == 0) {
1185                 tevent_req_done(req);
1186                 return tevent_req_post(req, ev);
1187         }
1188         if (ret != EAGAIN) {
1189                 tevent_req_error(req, ret);
1190                 return tevent_req_post(req, ev);
1191         }
1192         return req;
1193 }
1194
1195 static int ctdb_fetch_lock_check(struct tevent_req *req)
1196 {
1197         struct ctdb_fetch_lock_state *state = tevent_req_data(
1198                 req, struct ctdb_fetch_lock_state);
1199         struct ctdb_record_handle *h = state->h;
1200         struct ctdb_ltdb_header header;
1201         TDB_DATA data = tdb_null;
1202         int ret, err = 0;
1203         bool do_migrate = false;
1204
1205         ret = tdb_chainlock(h->db->ltdb->tdb, h->key);
1206         if (ret != 0) {
1207                 DEBUG(DEBUG_ERR,
1208                       ("fetch_lock: %s tdb_chainlock failed, %s\n",
1209                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1210                 err = EIO;
1211                 goto failed;
1212         }
1213
1214         data = tdb_fetch(h->db->ltdb->tdb, h->key);
1215         if (data.dptr == NULL) {
1216                 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
1217                         goto migrate;
1218                 } else {
1219                         err = EIO;
1220                         goto failed;
1221                 }
1222         }
1223
1224         /* Got the record */
1225         ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
1226         if (ret != 0) {
1227                 err = ret;
1228                 goto failed;
1229         }
1230
1231         if (! state->readonly) {
1232                 /* Read/write access */
1233                 if (header.dmaster == state->pnn &&
1234                     header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
1235                         goto migrate;
1236                 }
1237
1238                 if (header.dmaster != state->pnn) {
1239                         goto migrate;
1240                 }
1241         } else {
1242                 /* Readonly access */
1243                 if (header.dmaster != state->pnn &&
1244                     ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1245                                        CTDB_REC_RO_HAVE_DELEGATIONS))) {
1246                         goto migrate;
1247                 }
1248         }
1249
1250         /* We are the dmaster or readonly delegation */
1251         h->header = header;
1252         h->data = data;
1253         if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1254                             CTDB_REC_RO_HAVE_DELEGATIONS)) {
1255                 h->readonly = true;
1256         }
1257         return 0;
1258
1259 migrate:
1260         do_migrate = true;
1261         err = EAGAIN;
1262
1263 failed:
1264         if (data.dptr != NULL) {
1265                 free(data.dptr);
1266         }
1267         ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
1268         if (ret != 0) {
1269                 DEBUG(DEBUG_ERR,
1270                       ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1271                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1272                 return EIO;
1273         }
1274
1275         if (do_migrate) {
1276                 ctdb_fetch_lock_migrate(req);
1277         }
1278         return err;
1279 }
1280
1281 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
1282 {
1283         struct ctdb_fetch_lock_state *state = tevent_req_data(
1284                 req, struct ctdb_fetch_lock_state);
1285         struct ctdb_req_call request;
1286         struct tevent_req *subreq;
1287
1288         ZERO_STRUCT(request);
1289         request.flags = CTDB_IMMEDIATE_MIGRATION;
1290         if (state->readonly) {
1291                 request.flags |= CTDB_WANT_READONLY;
1292         }
1293         request.db_id = state->h->db->db_id;
1294         request.callid = CTDB_NULL_FUNC;
1295         request.key = state->h->key;
1296         request.calldata = tdb_null;
1297
1298         subreq = ctdb_client_call_send(state, state->ev, state->client,
1299                                        &request);
1300         if (tevent_req_nomem(subreq, req)) {
1301                 return;
1302         }
1303
1304         tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
1305 }
1306
1307 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
1308 {
1309         struct tevent_req *req = tevent_req_callback_data(
1310                 subreq, struct tevent_req);
1311         struct ctdb_fetch_lock_state *state = tevent_req_data(
1312                 req, struct ctdb_fetch_lock_state);
1313         struct ctdb_reply_call *reply;
1314         int ret;
1315         bool status;
1316
1317         status = ctdb_client_call_recv(subreq, state, &reply, &ret);
1318         TALLOC_FREE(subreq);
1319         if (! status) {
1320                 DEBUG(DEBUG_ERR, ("fetch_lock: %s CALL failed, ret=%d\n",
1321                                   state->h->db->db_name, ret));
1322                 tevent_req_error(req, ret);
1323                 return;
1324         }
1325
1326         if (reply->status != 0) {
1327                 tevent_req_error(req, EIO);
1328                 return;
1329         }
1330         talloc_free(reply);
1331
1332         ret = ctdb_fetch_lock_check(req);
1333         if (ret != 0) {
1334                 if (ret != EAGAIN) {
1335                         tevent_req_error(req, ret);
1336                 }
1337                 return;
1338         }
1339
1340         tevent_req_done(req);
1341 }
1342
1343 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
1344 {
1345         int ret;
1346
1347         ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
1348         if (ret != 0) {
1349                 DEBUG(DEBUG_ERR,
1350                       ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1351                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1352         }
1353         free(h->data.dptr);
1354         return 0;
1355 }
1356
1357 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
1358                                                 struct ctdb_ltdb_header *header,
1359                                                 TALLOC_CTX *mem_ctx,
1360                                                 TDB_DATA *data, int *perr)
1361 {
1362         struct ctdb_fetch_lock_state *state = tevent_req_data(
1363                 req, struct ctdb_fetch_lock_state);
1364         struct ctdb_record_handle *h = state->h;
1365         int err;
1366
1367         if (tevent_req_is_unix_error(req, &err)) {
1368                 if (perr != NULL) {
1369                         TALLOC_FREE(state->h);
1370                         *perr = err;
1371                 }
1372                 return NULL;
1373         }
1374
1375         if (header != NULL) {
1376                 *header = h->header;
1377         }
1378         if (data != NULL) {
1379                 size_t offset;
1380
1381                 offset = ctdb_ltdb_header_len(&h->header);
1382
1383                 data->dsize = h->data.dsize - offset;
1384                 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
1385                                            data->dsize);
1386                 if (data->dptr == NULL) {
1387                         TALLOC_FREE(state->h);
1388                         if (perr != NULL) {
1389                                 *perr = ENOMEM;
1390                         }
1391                         return NULL;
1392                 }
1393         }
1394
1395         talloc_set_destructor(h, ctdb_record_handle_destructor);
1396         return h;
1397 }
1398
1399 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1400                     struct ctdb_client_context *client,
1401                     struct ctdb_db_context *db, TDB_DATA key, bool readonly,
1402                     struct ctdb_record_handle **out,
1403                     struct ctdb_ltdb_header *header, TDB_DATA *data)
1404 {
1405         struct tevent_req *req;
1406         struct ctdb_record_handle *h;
1407         int ret;
1408
1409         req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
1410         if (req == NULL) {
1411                 return ENOMEM;
1412         }
1413
1414         tevent_req_poll(req, ev);
1415
1416         h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
1417         if (h == NULL) {
1418                 return ret;
1419         }
1420
1421         *out = h;
1422         return 0;
1423 }
1424
1425 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
1426 {
1427         uint8_t header[sizeof(struct ctdb_ltdb_header)];
1428         TDB_DATA rec[2];
1429         int ret;
1430
1431         /* Cannot modify the record if it was obtained as a readonly copy */
1432         if (h->readonly) {
1433                 return EINVAL;
1434         }
1435
1436         /* Check if the new data is same */
1437         if (h->data.dsize == data.dsize &&
1438             memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
1439                 /* No need to do anything */
1440                 return 0;
1441         }
1442
1443         ctdb_ltdb_header_push(&h->header, header);
1444
1445         rec[0].dsize = ctdb_ltdb_header_len(&h->header);
1446         rec[0].dptr = header;
1447
1448         rec[1].dsize = data.dsize;
1449         rec[1].dptr = data.dptr;
1450
1451         ret = tdb_storev(h->db->ltdb->tdb, h->key, rec, 2, TDB_REPLACE);
1452         if (ret != 0) {
1453                 DEBUG(DEBUG_ERR,
1454                       ("store_record: %s tdb_storev failed, %s\n",
1455                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1456                 return EIO;
1457         }
1458
1459         return 0;
1460 }
1461
1462 struct ctdb_delete_record_state {
1463         struct ctdb_record_handle *h;
1464 };
1465
1466 static void ctdb_delete_record_done(struct tevent_req *subreq);
1467
1468 struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx,
1469                                            struct tevent_context *ev,
1470                                            struct ctdb_record_handle *h)
1471 {
1472         struct tevent_req *req, *subreq;
1473         struct ctdb_delete_record_state *state;
1474         struct ctdb_key_data key;
1475         struct ctdb_req_control request;
1476         uint8_t header[sizeof(struct ctdb_ltdb_header)];
1477         TDB_DATA rec;
1478         int ret;
1479
1480         req = tevent_req_create(mem_ctx, &state,
1481                                 struct ctdb_delete_record_state);
1482         if (req == NULL) {
1483                 return NULL;
1484         }
1485
1486         state->h = h;
1487
1488         /* Cannot delete the record if it was obtained as a readonly copy */
1489         if (h->readonly) {
1490                 DEBUG(DEBUG_ERR, ("fetch_lock delete: %s readonly record\n",
1491                                   h->db->db_name));
1492                 tevent_req_error(req, EINVAL);
1493                 return tevent_req_post(req, ev);
1494         }
1495
1496         ctdb_ltdb_header_push(&h->header, header);
1497
1498         rec.dsize = ctdb_ltdb_header_len(&h->header);
1499         rec.dptr = header;
1500
1501         ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1502         if (ret != 0) {
1503                 DEBUG(DEBUG_ERR,
1504                       ("fetch_lock delete: %s tdb_sore failed, %s\n",
1505                        h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1506                 tevent_req_error(req, EIO);
1507                 return tevent_req_post(req, ev);
1508         }
1509
1510         key.db_id = h->db->db_id;
1511         key.header = h->header;
1512         key.key = h->key;
1513
1514         ctdb_req_control_schedule_for_deletion(&request, &key);
1515         subreq = ctdb_client_control_send(state, ev, h->client,
1516                                           ctdb_client_pnn(h->client),
1517                                           tevent_timeval_zero(),
1518                                           &request);
1519         if (tevent_req_nomem(subreq, req)) {
1520                 return tevent_req_post(req, ev);
1521         }
1522         tevent_req_set_callback(subreq, ctdb_delete_record_done, req);
1523
1524         return req;
1525 }
1526
1527 static void ctdb_delete_record_done(struct tevent_req *subreq)
1528 {
1529         struct tevent_req *req = tevent_req_callback_data(
1530                 subreq, struct tevent_req);
1531         struct ctdb_delete_record_state *state = tevent_req_data(
1532                 req, struct ctdb_delete_record_state);
1533         int ret;
1534         bool status;
1535
1536         status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
1537         TALLOC_FREE(subreq);
1538         if (! status) {
1539                 DEBUG(DEBUG_ERR,
1540                       ("delete_record: %s SCHDULE_FOR_DELETION failed, "
1541                        "ret=%d\n", state->h->db->db_name, ret));
1542                 tevent_req_error(req, ret);
1543                 return;
1544         }
1545
1546         tevent_req_done(req);
1547 }
1548
1549 bool ctdb_delete_record_recv(struct tevent_req *req, int *perr)
1550 {
1551         int err;
1552
1553         if (tevent_req_is_unix_error(req, &err)) {
1554                 if (perr != NULL) {
1555                         *perr = err;
1556                 }
1557                 return false;
1558         }
1559
1560         return true;
1561 }
1562
1563
1564 int ctdb_delete_record(struct ctdb_record_handle *h)
1565 {
1566         struct tevent_context *ev = h->ev;
1567         TALLOC_CTX *mem_ctx;
1568         struct tevent_req *req;
1569         int ret;
1570         bool status;
1571
1572         mem_ctx = talloc_new(NULL);
1573         if (mem_ctx == NULL) {
1574                 return ENOMEM;
1575         }
1576
1577         req = ctdb_delete_record_send(mem_ctx, ev, h);
1578         if (req == NULL) {
1579                 talloc_free(mem_ctx);
1580                 return ENOMEM;
1581         }
1582
1583         tevent_req_poll(req, ev);
1584
1585         status = ctdb_delete_record_recv(req, &ret);
1586         talloc_free(mem_ctx);
1587         if (! status) {
1588                 return ret;
1589         }
1590
1591         return 0;
1592 }
1593
1594 /*
1595  * Global lock functions
1596  */
1597
1598 struct ctdb_g_lock_lock_state {
1599         struct tevent_context *ev;
1600         struct ctdb_client_context *client;
1601         struct ctdb_db_context *db;
1602         TDB_DATA key;
1603         struct ctdb_server_id my_sid;
1604         enum ctdb_g_lock_type lock_type;
1605         struct ctdb_record_handle *h;
1606         /* state for verification of active locks */
1607         struct ctdb_g_lock_list *lock_list;
1608         unsigned int current;
1609 };
1610
1611 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1612 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1613 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1614 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1615 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1616
1617 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1618                                   enum ctdb_g_lock_type l2)
1619 {
1620         if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1621                 return false;
1622         }
1623         return true;
1624 }
1625
1626 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1627                                          struct tevent_context *ev,
1628                                          struct ctdb_client_context *client,
1629                                          struct ctdb_db_context *db,
1630                                          const char *keyname,
1631                                          struct ctdb_server_id *sid,
1632                                          bool readonly)
1633 {
1634         struct tevent_req *req, *subreq;
1635         struct ctdb_g_lock_lock_state *state;
1636
1637         req = tevent_req_create(mem_ctx, &state,
1638                                 struct ctdb_g_lock_lock_state);
1639         if (req == NULL) {
1640                 return NULL;
1641         }
1642
1643         state->ev = ev;
1644         state->client = client;
1645         state->db = db;
1646         state->key.dptr = discard_const(keyname);
1647         state->key.dsize = strlen(keyname) + 1;
1648         state->my_sid = *sid;
1649         state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1650
1651         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1652                                       false);
1653         if (tevent_req_nomem(subreq, req)) {
1654                 return tevent_req_post(req, ev);
1655         }
1656         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1657
1658         return req;
1659 }
1660
1661 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1662 {
1663         struct tevent_req *req = tevent_req_callback_data(
1664                 subreq, struct tevent_req);
1665         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1666                 req, struct ctdb_g_lock_lock_state);
1667         TDB_DATA data;
1668         int ret = 0;
1669
1670         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1671         TALLOC_FREE(subreq);
1672         if (state->h == NULL) {
1673                 DEBUG(DEBUG_ERR, ("g_lock_lock: %s fetch lock failed\n",
1674                                   (char *)state->key.dptr));
1675                 tevent_req_error(req, ret);
1676                 return;
1677         }
1678
1679         if (state->lock_list != NULL) {
1680                 TALLOC_FREE(state->lock_list);
1681                 state->current = 0;
1682         }
1683
1684         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1685                                     &state->lock_list);
1686         talloc_free(data.dptr);
1687         if (ret != 0) {
1688                 DEBUG(DEBUG_ERR, ("g_lock_lock: %s invalid lock data\n",
1689                                   (char *)state->key.dptr));
1690                 tevent_req_error(req, ret);
1691                 return;
1692         }
1693
1694         ctdb_g_lock_lock_process_locks(req);
1695 }
1696
1697 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1698 {
1699         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1700                 req, struct ctdb_g_lock_lock_state);
1701         struct tevent_req *subreq;
1702         struct ctdb_g_lock *lock;
1703         bool check_server = false;
1704         int ret;
1705
1706         while (state->current < state->lock_list->num) {
1707                 lock = &state->lock_list->lock[state->current];
1708
1709                 /* We should not ask for the same lock more than once */
1710                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1711                         DEBUG(DEBUG_ERR, ("g_lock_lock: %s deadlock\n",
1712                                           (char *)state->key.dptr));
1713                         tevent_req_error(req, EDEADLK);
1714                         return;
1715                 }
1716
1717                 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1718                         check_server = true;
1719                         break;
1720                 }
1721
1722                 state->current += 1;
1723         }
1724
1725         if (check_server) {
1726                 struct ctdb_req_control request;
1727
1728                 ctdb_req_control_process_exists(&request, lock->sid.pid);
1729                 subreq = ctdb_client_control_send(state, state->ev,
1730                                                   state->client,
1731                                                   lock->sid.vnn,
1732                                                   tevent_timeval_zero(),
1733                                                   &request);
1734                 if (tevent_req_nomem(subreq, req)) {
1735                         return;
1736                 }
1737                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1738                 return;
1739         }
1740
1741         /* There is no conflict, add ourself to the lock_list */
1742         state->lock_list->lock = talloc_realloc(state->lock_list,
1743                                                 state->lock_list->lock,
1744                                                 struct ctdb_g_lock,
1745                                                 state->lock_list->num + 1);
1746         if (state->lock_list->lock == NULL) {
1747                 tevent_req_error(req, ENOMEM);
1748                 return;
1749         }
1750
1751         lock = &state->lock_list->lock[state->lock_list->num];
1752         lock->type = state->lock_type;
1753         lock->sid = state->my_sid;
1754         state->lock_list->num += 1;
1755
1756         ret = ctdb_g_lock_lock_update(req);
1757         if (ret != 0) {
1758                 tevent_req_error(req, ret);
1759                 return;
1760         }
1761
1762         TALLOC_FREE(state->h);
1763         tevent_req_done(req);
1764 }
1765
1766 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1767 {
1768         struct tevent_req *req = tevent_req_callback_data(
1769                 subreq, struct tevent_req);
1770         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1771                 req, struct ctdb_g_lock_lock_state);
1772         struct ctdb_reply_control *reply;
1773         int ret, value;
1774         bool status;
1775
1776         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1777         TALLOC_FREE(subreq);
1778         if (! status) {
1779                 DEBUG(DEBUG_ERR,
1780                       ("g_lock_lock: %s PROCESS_EXISTS failed, ret=%d\n",
1781                        (char *)state->key.dptr, ret));
1782                 tevent_req_error(req, ret);
1783                 return;
1784         }
1785
1786         ret = ctdb_reply_control_process_exists(reply, &value);
1787         if (ret != 0) {
1788                 tevent_req_error(req, ret);
1789                 return;
1790         }
1791         talloc_free(reply);
1792
1793         if (value == 0) {
1794                 /* server process exists, need to retry */
1795                 TALLOC_FREE(state->h);
1796                 subreq = tevent_wakeup_send(state, state->ev,
1797                                             tevent_timeval_current_ofs(0,1000));
1798                 if (tevent_req_nomem(subreq, req)) {
1799                         return;
1800                 }
1801                 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1802                 return;
1803         }
1804
1805         /* server process does not exist, remove conflicting entry */
1806         state->lock_list->lock[state->current] =
1807                 state->lock_list->lock[state->lock_list->num-1];
1808         state->lock_list->num -= 1;
1809
1810         ret = ctdb_g_lock_lock_update(req);
1811         if (ret != 0) {
1812                 tevent_req_error(req, ret);
1813                 return;
1814         }
1815
1816         ctdb_g_lock_lock_process_locks(req);
1817 }
1818
1819 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1820 {
1821         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1822                 req, struct ctdb_g_lock_lock_state);
1823         TDB_DATA data;
1824         int ret;
1825
1826         data.dsize = ctdb_g_lock_list_len(state->lock_list);
1827         data.dptr = talloc_size(state, data.dsize);
1828         if (data.dptr == NULL) {
1829                 return ENOMEM;
1830         }
1831
1832         ctdb_g_lock_list_push(state->lock_list, data.dptr);
1833         ret = ctdb_store_record(state->h, data);
1834         talloc_free(data.dptr);
1835         return ret;
1836 }
1837
1838 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1839 {
1840         struct tevent_req *req = tevent_req_callback_data(
1841                 subreq, struct tevent_req);
1842         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1843                 req, struct ctdb_g_lock_lock_state);
1844         bool success;
1845
1846         success = tevent_wakeup_recv(subreq);
1847         TALLOC_FREE(subreq);
1848         if (! success) {
1849                 tevent_req_error(req, ENOMEM);
1850                 return;
1851         }
1852
1853         subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1854                                       state->db, state->key, false);
1855         if (tevent_req_nomem(subreq, req)) {
1856                 return;
1857         }
1858         tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1859 }
1860
1861 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1862 {
1863         struct ctdb_g_lock_lock_state *state = tevent_req_data(
1864                 req, struct ctdb_g_lock_lock_state);
1865         int err;
1866
1867         TALLOC_FREE(state->h);
1868
1869         if (tevent_req_is_unix_error(req, &err)) {
1870                 if (perr != NULL) {
1871                         *perr = err;
1872                 }
1873                 return false;
1874         }
1875
1876         return true;
1877 }
1878
1879 struct ctdb_g_lock_unlock_state {
1880         struct tevent_context *ev;
1881         struct ctdb_client_context *client;
1882         struct ctdb_db_context *db;
1883         TDB_DATA key;
1884         struct ctdb_server_id my_sid;
1885         struct ctdb_record_handle *h;
1886         struct ctdb_g_lock_list *lock_list;
1887 };
1888
1889 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1890 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1891 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq);
1892
1893 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1894                                            struct tevent_context *ev,
1895                                            struct ctdb_client_context *client,
1896                                            struct ctdb_db_context *db,
1897                                            const char *keyname,
1898                                            struct ctdb_server_id sid)
1899 {
1900         struct tevent_req *req, *subreq;
1901         struct ctdb_g_lock_unlock_state *state;
1902
1903         req = tevent_req_create(mem_ctx, &state,
1904                                 struct ctdb_g_lock_unlock_state);
1905         if (req == NULL) {
1906                 return NULL;
1907         }
1908
1909         state->ev = ev;
1910         state->client = client;
1911         state->db = db;
1912         state->key.dptr = discard_const(keyname);
1913         state->key.dsize = strlen(keyname) + 1;
1914         state->my_sid = sid;
1915
1916         subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1917                                       false);
1918         if (tevent_req_nomem(subreq, req)) {
1919                 return tevent_req_post(req, ev);
1920         }
1921         tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1922
1923         return req;
1924 }
1925
1926 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1927 {
1928         struct tevent_req *req = tevent_req_callback_data(
1929                 subreq, struct tevent_req);
1930         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1931                 req, struct ctdb_g_lock_unlock_state);
1932         TDB_DATA data;
1933         int ret = 0;
1934
1935         state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1936         TALLOC_FREE(subreq);
1937         if (state->h == NULL) {
1938                 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s fetch lock failed\n",
1939                                   (char *)state->key.dptr));
1940                 tevent_req_error(req, ret);
1941                 return;
1942         }
1943
1944         ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1945                                     &state->lock_list);
1946         if (ret != 0) {
1947                 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s invalid lock data\n",
1948                                   (char *)state->key.dptr));
1949                 tevent_req_error(req, ret);
1950                 return;
1951         }
1952
1953         ret = ctdb_g_lock_unlock_update(req);
1954         if (ret != 0) {
1955                 tevent_req_error(req, ret);
1956                 return;
1957         }
1958
1959         if (state->lock_list->num == 0) {
1960                 subreq = ctdb_delete_record_send(state, state->ev, state->h);
1961                 if (tevent_req_nomem(subreq, req)) {
1962                         return;
1963                 }
1964                 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted,
1965                                         req);
1966                 return;
1967         }
1968
1969         TALLOC_FREE(state->h);
1970         tevent_req_done(req);
1971 }
1972
1973 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
1974 {
1975         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1976                 req, struct ctdb_g_lock_unlock_state);
1977         struct ctdb_g_lock *lock;
1978         int ret, i;
1979
1980         for (i=0; i<state->lock_list->num; i++) {
1981                 lock = &state->lock_list->lock[i];
1982
1983                 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1984                         break;
1985                 }
1986         }
1987
1988         if (i < state->lock_list->num) {
1989                 state->lock_list->lock[i] =
1990                         state->lock_list->lock[state->lock_list->num-1];
1991                 state->lock_list->num -= 1;
1992         }
1993
1994         if (state->lock_list->num != 0) {
1995                 TDB_DATA data;
1996
1997                 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1998                 data.dptr = talloc_size(state, data.dsize);
1999                 if (data.dptr == NULL) {
2000                         return ENOMEM;
2001                 }
2002
2003                 ctdb_g_lock_list_push(state->lock_list, data.dptr);
2004                 ret = ctdb_store_record(state->h, data);
2005                 talloc_free(data.dptr);
2006                 if (ret != 0) {
2007                         return ret;
2008                 }
2009         }
2010
2011         return 0;
2012 }
2013
2014 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq)
2015 {
2016         struct tevent_req *req = tevent_req_callback_data(
2017                 subreq, struct tevent_req);
2018         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2019                 req, struct ctdb_g_lock_unlock_state);
2020         int ret;
2021         bool status;
2022
2023         status = ctdb_delete_record_recv(subreq, &ret);
2024         if (! status) {
2025                 DEBUG(DEBUG_ERR,
2026                       ("g_lock_unlock %s delete record failed, ret=%d\n",
2027                        (char *)state->key.dptr, ret));
2028                 tevent_req_error(req, ret);
2029                 return;
2030         }
2031
2032         TALLOC_FREE(state->h);
2033         tevent_req_done(req);
2034 }
2035
2036 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
2037 {
2038         struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2039                 req, struct ctdb_g_lock_unlock_state);
2040         int err;
2041
2042         TALLOC_FREE(state->h);
2043
2044         if (tevent_req_is_unix_error(req, &err)) {
2045                 if (perr != NULL) {
2046                         *perr = err;
2047                 }
2048                 return false;
2049         }
2050
2051         return true;
2052 }
2053
2054 /*
2055  * Persistent database functions
2056  */
2057 struct ctdb_transaction_start_state {
2058         struct tevent_context *ev;
2059         struct ctdb_client_context *client;
2060         struct timeval timeout;
2061         struct ctdb_transaction_handle *h;
2062         uint32_t destnode;
2063 };
2064
2065 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
2066 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
2067
2068 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
2069                                                struct tevent_context *ev,
2070                                                struct ctdb_client_context *client,
2071                                                struct timeval timeout,
2072                                                struct ctdb_db_context *db,
2073                                                bool readonly)
2074 {
2075         struct ctdb_transaction_start_state *state;
2076         struct tevent_req *req, *subreq;
2077         struct ctdb_transaction_handle *h;
2078
2079         req = tevent_req_create(mem_ctx, &state,
2080                                 struct ctdb_transaction_start_state);
2081         if (req == NULL) {
2082                 return NULL;
2083         }
2084
2085         if (! db->persistent) {
2086                 tevent_req_error(req, EINVAL);
2087                 return tevent_req_post(req, ev);
2088         }
2089
2090         state->ev = ev;
2091         state->client = client;
2092         state->destnode = ctdb_client_pnn(client);
2093
2094         h = talloc_zero(db, struct ctdb_transaction_handle);
2095         if (tevent_req_nomem(h, req)) {
2096                 return tevent_req_post(req, ev);
2097         }
2098
2099         h->ev = ev;
2100         h->client = client;
2101         h->db = db;
2102         h->readonly = readonly;
2103         h->updated = false;
2104
2105         /* SRVID is unique for databases, so client can have transactions
2106          * active for multiple databases */
2107         h->sid = ctdb_client_get_server_id(client, db->db_id);
2108
2109         h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
2110         if (tevent_req_nomem(h->recbuf, req)) {
2111                 return tevent_req_post(req, ev);
2112         }
2113
2114         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
2115         if (tevent_req_nomem(h->lock_name, req)) {
2116                 return tevent_req_post(req, ev);
2117         }
2118
2119         state->h = h;
2120
2121         subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
2122         if (tevent_req_nomem(subreq, req)) {
2123                 return tevent_req_post(req, ev);
2124         }
2125         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
2126
2127         return req;
2128 }
2129
2130 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
2131 {
2132         struct tevent_req *req = tevent_req_callback_data(
2133                 subreq, struct tevent_req);
2134         struct ctdb_transaction_start_state *state = tevent_req_data(
2135                 req, struct ctdb_transaction_start_state);
2136         bool status;
2137         int ret;
2138
2139         status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
2140         TALLOC_FREE(subreq);
2141         if (! status) {
2142                 DEBUG(DEBUG_ERR,
2143                       ("transaction_start: %s attach g_lock.tdb failed\n",
2144                        state->h->db->db_name));
2145                 tevent_req_error(req, ret);
2146                 return;
2147         }
2148
2149         subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
2150                                        state->h->db_g_lock,
2151                                        state->h->lock_name,
2152                                        &state->h->sid, state->h->readonly);
2153         if (tevent_req_nomem(subreq, req)) {
2154                 return;
2155         }
2156         tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
2157 }
2158
2159 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
2160 {
2161         struct tevent_req *req = tevent_req_callback_data(
2162                 subreq, struct tevent_req);
2163         struct ctdb_transaction_start_state *state = tevent_req_data(
2164                 req, struct ctdb_transaction_start_state);
2165         int ret;
2166         bool status;
2167
2168         status = ctdb_g_lock_lock_recv(subreq, &ret);
2169         TALLOC_FREE(subreq);
2170         if (! status) {
2171                 DEBUG(DEBUG_ERR,
2172                       ("transaction_start: %s g_lock lock failed, ret=%d\n",
2173                        state->h->db->db_name, ret));
2174                 tevent_req_error(req, ret);
2175                 return;
2176         }
2177
2178         tevent_req_done(req);
2179 }
2180
2181 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
2182                                         struct tevent_req *req,
2183                                         int *perr)
2184 {
2185         struct ctdb_transaction_start_state *state = tevent_req_data(
2186                 req, struct ctdb_transaction_start_state);
2187         int err;
2188
2189         if (tevent_req_is_unix_error(req, &err)) {
2190                 if (perr != NULL) {
2191                         *perr = err;
2192                 }
2193                 return NULL;
2194         }
2195
2196         return state->h;
2197 }
2198
2199 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
2200                            struct ctdb_client_context *client,
2201                            struct timeval timeout,
2202                            struct ctdb_db_context *db, bool readonly,
2203                            struct ctdb_transaction_handle **out)
2204 {
2205         struct tevent_req *req;
2206         struct ctdb_transaction_handle *h;
2207         int ret;
2208
2209         req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
2210                                           readonly);
2211         if (req == NULL) {
2212                 return ENOMEM;
2213         }
2214
2215         tevent_req_poll(req, ev);
2216
2217         h = ctdb_transaction_start_recv(req, &ret);
2218         if (h == NULL) {
2219                 return ret;
2220         }
2221
2222         *out = h;
2223         return 0;
2224 }
2225
2226 struct ctdb_transaction_record_fetch_state {
2227         TDB_DATA key, data;
2228         struct ctdb_ltdb_header header;
2229         bool found;
2230 };
2231
2232 static int ctdb_transaction_record_fetch_traverse(
2233                                 uint32_t reqid,
2234                                 struct ctdb_ltdb_header *nullheader,
2235                                 TDB_DATA key, TDB_DATA data,
2236                                 void *private_data)
2237 {
2238         struct ctdb_transaction_record_fetch_state *state =
2239                 (struct ctdb_transaction_record_fetch_state *)private_data;
2240
2241         if (state->key.dsize == key.dsize &&
2242             memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
2243                 int ret;
2244
2245                 ret = ctdb_ltdb_header_extract(&data, &state->header);
2246                 if (ret != 0) {
2247                         DEBUG(DEBUG_ERR,
2248                               ("record_fetch: Failed to extract header, "
2249                                "ret=%d\n", ret));
2250                         return 1;
2251                 }
2252
2253                 state->data = data;
2254                 state->found = true;
2255         }
2256
2257         return 0;
2258 }
2259
2260 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
2261                                          TDB_DATA key,
2262                                          struct ctdb_ltdb_header *header,
2263                                          TDB_DATA *data)
2264 {
2265         struct ctdb_transaction_record_fetch_state state;
2266         int ret;
2267
2268         state.key = key;
2269         state.found = false;
2270
2271         ret = ctdb_rec_buffer_traverse(h->recbuf,
2272                                        ctdb_transaction_record_fetch_traverse,
2273                                        &state);
2274         if (ret != 0) {
2275                 return ret;
2276         }
2277
2278         if (state.found) {
2279                 if (header != NULL) {
2280                         *header = state.header;
2281                 }
2282                 if (data != NULL) {
2283                         *data = state.data;
2284                 }
2285                 return 0;
2286         }
2287
2288         return ENOENT;
2289 }
2290
2291 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
2292                                   TDB_DATA key,
2293                                   TALLOC_CTX *mem_ctx, TDB_DATA *data)
2294 {
2295         TDB_DATA tmp_data;
2296         struct ctdb_ltdb_header header;
2297         int ret;
2298
2299         ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
2300         if (ret == 0) {
2301                 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
2302                                            tmp_data.dsize);
2303                 if (data->dptr == NULL) {
2304                         return ENOMEM;
2305                 }
2306                 data->dsize = tmp_data.dsize;
2307                 return 0;
2308         }
2309
2310         ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
2311         if (ret != 0) {
2312                 return ret;
2313         }
2314
2315         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
2316         if (ret != 0) {
2317                 return ret;
2318         }
2319
2320         return 0;
2321 }
2322
2323 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
2324                                   TDB_DATA key, TDB_DATA data)
2325 {
2326         TALLOC_CTX *tmp_ctx;
2327         struct ctdb_ltdb_header header;
2328         TDB_DATA old_data;
2329         int ret;
2330
2331         if (h->readonly) {
2332                 return EINVAL;
2333         }
2334
2335         tmp_ctx = talloc_new(h);
2336         if (tmp_ctx == NULL) {
2337                 return ENOMEM;
2338         }
2339
2340         ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
2341         if (ret != 0) {
2342                 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
2343                 if (ret != 0) {
2344                         return ret;
2345                 }
2346         }
2347
2348         if (old_data.dsize == data.dsize &&
2349             memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
2350                 talloc_free(tmp_ctx);
2351                 return 0;
2352         }
2353
2354         header.dmaster = ctdb_client_pnn(h->client);
2355         header.rsn += 1;
2356
2357         ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
2358         talloc_free(tmp_ctx);
2359         if (ret != 0) {
2360                 return ret;
2361         }
2362         h->updated = true;
2363
2364         return 0;
2365 }
2366
2367 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
2368                                    TDB_DATA key)
2369 {
2370         return ctdb_transaction_store_record(h, key, tdb_null);
2371 }
2372
2373 static int ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle *h,
2374                                             uint64_t *seqnum)
2375 {
2376         const char *keyname = CTDB_DB_SEQNUM_KEY;
2377         TDB_DATA key, data;
2378         struct ctdb_ltdb_header header;
2379         int ret;
2380
2381         key.dptr = discard_const(keyname);
2382         key.dsize = strlen(keyname) + 1;
2383
2384         ret = ctdb_ltdb_fetch(h->db, key, &header, h, &data);
2385         if (ret != 0) {
2386                 DEBUG(DEBUG_ERR,
2387                       ("transaction_commit: %s seqnum fetch failed, ret=%d\n",
2388                        h->db->db_name, ret));
2389                 return ret;
2390         }
2391
2392         if (data.dsize == 0) {
2393                 /* initial data */
2394                 *seqnum = 0;
2395                 return 0;
2396         }
2397
2398         if (data.dsize != sizeof(uint64_t)) {
2399                 talloc_free(data.dptr);
2400                 return EINVAL;
2401         }
2402
2403         *seqnum = *(uint64_t *)data.dptr;
2404
2405         talloc_free(data.dptr);
2406         return 0;
2407 }
2408
2409 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
2410                                             uint64_t seqnum)
2411 {
2412         const char *keyname = CTDB_DB_SEQNUM_KEY;
2413         TDB_DATA key, data;
2414
2415         key.dptr = discard_const(keyname);
2416         key.dsize = strlen(keyname) + 1;
2417
2418         data.dptr = (uint8_t *)&seqnum;
2419         data.dsize = sizeof(seqnum);
2420
2421         return ctdb_transaction_store_record(h, key, data);
2422 }
2423
2424 struct ctdb_transaction_commit_state {
2425         struct tevent_context *ev;
2426         struct timeval timeout;
2427         struct ctdb_transaction_handle *h;
2428         uint64_t seqnum;
2429 };
2430
2431 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
2432 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq);
2433
2434 struct tevent_req *ctdb_transaction_commit_send(
2435                                         TALLOC_CTX *mem_ctx,
2436                                         struct tevent_context *ev,
2437                                         struct timeval timeout,
2438                                         struct ctdb_transaction_handle *h)
2439 {
2440         struct tevent_req *req, *subreq;
2441         struct ctdb_transaction_commit_state *state;
2442         struct ctdb_req_control request;
2443         int ret;
2444
2445         req = tevent_req_create(mem_ctx, &state,
2446                                 struct ctdb_transaction_commit_state);
2447         if (req == NULL) {
2448                 return NULL;
2449         }
2450
2451         state->ev = ev;
2452         state->timeout = timeout;
2453         state->h = h;
2454
2455         ret = ctdb_transaction_fetch_db_seqnum(h, &state->seqnum);
2456         if (ret != 0) {
2457                 tevent_req_error(req, ret);
2458                 return tevent_req_post(req, ev);
2459         }
2460
2461         ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
2462         if (ret != 0) {
2463                 tevent_req_error(req, ret);
2464                 return tevent_req_post(req, ev);
2465         }
2466
2467         ctdb_req_control_trans3_commit(&request, h->recbuf);
2468         subreq = ctdb_client_control_send(state, ev, h->client,
2469                                           ctdb_client_pnn(h->client),
2470                                           timeout, &request);
2471         if (tevent_req_nomem(subreq, req)) {
2472                 return tevent_req_post(req, ev);
2473         }
2474         tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2475
2476         return req;
2477 }
2478
2479 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2480 {
2481         struct tevent_req *req = tevent_req_callback_data(
2482                 subreq, struct tevent_req);
2483         struct ctdb_transaction_commit_state *state = tevent_req_data(
2484                 req, struct ctdb_transaction_commit_state);
2485         struct ctdb_transaction_handle *h = state->h;
2486         struct ctdb_reply_control *reply;
2487         uint64_t seqnum;
2488         int ret;
2489         bool status;
2490
2491         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2492         TALLOC_FREE(subreq);
2493         if (! status) {
2494                 DEBUG(DEBUG_ERR,
2495                       ("transaction_commit: %s TRANS3_COMMIT failed, ret=%d\n",
2496                        h->db->db_name, ret));
2497                 tevent_req_error(req, ret);
2498                 return;
2499         }
2500
2501         ret = ctdb_reply_control_trans3_commit(reply);
2502         talloc_free(reply);
2503
2504         if (ret != 0) {
2505                 /* Control failed due to recovery */
2506
2507                 ret = ctdb_transaction_fetch_db_seqnum(h, &seqnum);
2508                 if (ret != 0) {
2509                         tevent_req_error(req, ret);
2510                         return;
2511                 }
2512
2513                 if (seqnum == state->seqnum) {
2514                         struct ctdb_req_control request;
2515
2516                         /* try again */
2517                         ctdb_req_control_trans3_commit(&request,
2518                                                        state->h->recbuf);
2519                         subreq = ctdb_client_control_send(
2520                                         state, state->ev, state->h->client,
2521                                         ctdb_client_pnn(state->h->client),
2522                                         state->timeout, &request);
2523                         if (tevent_req_nomem(subreq, req)) {
2524                                 return;
2525                         }
2526                         tevent_req_set_callback(subreq,
2527                                                 ctdb_transaction_commit_done,
2528                                                 req);
2529                         return;
2530                 }
2531
2532                 if (seqnum != state->seqnum + 1) {
2533                         DEBUG(DEBUG_ERR,
2534                               ("transaction_commit: %s seqnum mismatch "
2535                                "0x%"PRIx64" != 0x%"PRIx64" + 1\n",
2536                                state->h->db->db_name, seqnum, state->seqnum));
2537                         tevent_req_error(req, EIO);
2538                         return;
2539                 }
2540         }
2541
2542         /* trans3_commit successful */
2543         subreq = ctdb_g_lock_unlock_send(state, state->ev, h->client,
2544                                          h->db_g_lock, h->lock_name, h->sid);
2545         if (tevent_req_nomem(subreq, req)) {
2546                 return;
2547         }
2548         tevent_req_set_callback(subreq, ctdb_transaction_commit_g_lock_done,
2549                                 req);
2550 }
2551
2552 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq)
2553 {
2554         struct tevent_req *req = tevent_req_callback_data(
2555                 subreq, struct tevent_req);
2556         struct ctdb_transaction_commit_state *state = tevent_req_data(
2557                 req, struct ctdb_transaction_commit_state);
2558         int ret;
2559         bool status;
2560
2561         status = ctdb_g_lock_unlock_recv(subreq, &ret);
2562         TALLOC_FREE(subreq);
2563         if (! status) {
2564                 DEBUG(DEBUG_ERR,
2565                       ("transaction_commit: %s g_lock unlock failed, ret=%d\n",
2566                        state->h->db->db_name, ret));
2567                 tevent_req_error(req, ret);
2568                 return;
2569         }
2570
2571         talloc_free(state->h);
2572         tevent_req_done(req);
2573 }
2574
2575 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2576 {
2577         int err;
2578
2579         if (tevent_req_is_unix_error(req, &err)) {
2580                 if (perr != NULL) {
2581                         *perr = err;
2582                 }
2583                 return false;
2584         }
2585
2586         return true;
2587 }
2588
2589 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2590 {
2591         struct tevent_context *ev = h->ev;
2592         TALLOC_CTX *mem_ctx;
2593         struct tevent_req *req;
2594         int ret;
2595         bool status;
2596
2597         if (h->readonly || ! h->updated) {
2598                 return ctdb_transaction_cancel(h);
2599         }
2600
2601         mem_ctx = talloc_new(NULL);
2602         if (mem_ctx == NULL) {
2603                 return ENOMEM;
2604         }
2605
2606         req = ctdb_transaction_commit_send(mem_ctx, ev,
2607                                            tevent_timeval_zero(), h);
2608         if (req == NULL) {
2609                 talloc_free(mem_ctx);
2610                 return ENOMEM;
2611         }
2612
2613         tevent_req_poll(req, ev);
2614
2615         status = ctdb_transaction_commit_recv(req, &ret);
2616         if (! status) {
2617                 talloc_free(mem_ctx);
2618                 return ret;
2619         }
2620
2621         talloc_free(mem_ctx);
2622         return 0;
2623 }
2624
2625 struct ctdb_transaction_cancel_state {
2626         struct tevent_context *ev;
2627         struct ctdb_transaction_handle *h;
2628         struct timeval timeout;
2629 };
2630
2631 static void ctdb_transaction_cancel_done(struct tevent_req *subreq);
2632
2633 struct tevent_req *ctdb_transaction_cancel_send(
2634                                         TALLOC_CTX *mem_ctx,
2635                                         struct tevent_context *ev,
2636                                         struct timeval timeout,
2637                                         struct ctdb_transaction_handle *h)
2638 {
2639         struct tevent_req *req, *subreq;
2640         struct ctdb_transaction_cancel_state *state;
2641
2642         req = tevent_req_create(mem_ctx, &state,
2643                                 struct ctdb_transaction_cancel_state);
2644         if (req == NULL) {
2645                 return NULL;
2646         }
2647
2648         state->ev = ev;
2649         state->h = h;
2650         state->timeout = timeout;
2651
2652         subreq = ctdb_g_lock_unlock_send(state, state->ev, state->h->client,
2653                                          state->h->db_g_lock,
2654                                          state->h->lock_name, state->h->sid);
2655         if (tevent_req_nomem(subreq, req)) {
2656                 return tevent_req_post(req, ev);
2657         }
2658         tevent_req_set_callback(subreq, ctdb_transaction_cancel_done,
2659                                 req);
2660
2661         return req;
2662 }
2663
2664 static void ctdb_transaction_cancel_done(struct tevent_req *subreq)
2665 {
2666         struct tevent_req *req = tevent_req_callback_data(
2667                 subreq, struct tevent_req);
2668         struct ctdb_transaction_cancel_state *state = tevent_req_data(
2669                 req, struct ctdb_transaction_cancel_state);
2670         int ret;
2671         bool status;
2672
2673         status = ctdb_g_lock_unlock_recv(subreq, &ret);
2674         TALLOC_FREE(subreq);
2675         if (! status) {
2676                 DEBUG(DEBUG_ERR,
2677                       ("transaction_cancel: %s g_lock unlock failed, ret=%d\n",
2678                        state->h->db->db_name, ret));
2679                 talloc_free(state->h);
2680                 tevent_req_error(req, ret);
2681                 return;
2682         }
2683
2684         talloc_free(state->h);
2685         tevent_req_done(req);
2686 }
2687
2688 bool ctdb_transaction_cancel_recv(struct tevent_req *req, int *perr)
2689 {
2690         int err;
2691
2692         if (tevent_req_is_unix_error(req, &err)) {
2693                 if (perr != NULL) {
2694                         *perr = err;
2695                 }
2696                 return false;
2697         }
2698
2699         return true;
2700 }
2701
2702 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2703 {
2704         struct tevent_context *ev = h->ev;
2705         struct tevent_req *req;
2706         TALLOC_CTX *mem_ctx;
2707         int ret;
2708         bool status;
2709
2710         mem_ctx = talloc_new(NULL);
2711         if (mem_ctx == NULL) {
2712                 talloc_free(h);
2713                 return ENOMEM;
2714         }
2715
2716         req = ctdb_transaction_cancel_send(mem_ctx, ev,
2717                                            tevent_timeval_zero(), h);
2718         if (req == NULL) {
2719                 talloc_free(mem_ctx);
2720                 talloc_free(h);
2721                 return ENOMEM;
2722         }
2723
2724         tevent_req_poll(req, ev);
2725
2726         status = ctdb_transaction_cancel_recv(req, &ret);
2727         if (! status) {
2728                 talloc_free(mem_ctx);
2729                 return ret;
2730         }
2731
2732         talloc_free(mem_ctx);
2733         return 0;
2734 }
2735
2736 /*
2737  * TODO:
2738  *
2739  * In future Samba should register SERVER_ID.
2740  * Make that structure same as struct srvid {}.
2741  */