991ba4894c2c1df8358f4c77e779b94eca33e944
[nivanova/samba-autobuild/.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/locale.h"
28 #include <stdlib.h>
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 pid_t ctdbd_pid;
33
34 /*
35   allocate a packet for use in client<->daemon communication
36  */
37 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
38                                             TALLOC_CTX *mem_ctx, 
39                                             enum ctdb_operation operation, 
40                                             size_t length, size_t slength,
41                                             const char *type)
42 {
43         int size;
44         struct ctdb_req_header *hdr;
45
46         length = MAX(length, slength);
47         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
48
49         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
50         if (hdr == NULL) {
51                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
52                          operation, (unsigned)length));
53                 return NULL;
54         }
55         talloc_set_name_const(hdr, type);
56         hdr->length       = length;
57         hdr->operation    = operation;
58         hdr->ctdb_magic   = CTDB_MAGIC;
59         hdr->ctdb_version = CTDB_VERSION;
60         hdr->srcnode      = ctdb->pnn;
61         if (ctdb->vnn_map) {
62                 hdr->generation = ctdb->vnn_map->generation;
63         }
64
65         return hdr;
66 }
67
68 /*
69   local version of ctdb_call
70 */
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73                     TDB_DATA *data, bool updatetdb, uint32_t caller)
74 {
75         struct ctdb_call_info *c;
76         struct ctdb_registered_call *fn;
77         struct ctdb_context *ctdb = ctdb_db->ctdb;
78         
79         c = talloc(ctdb, struct ctdb_call_info);
80         CTDB_NO_MEMORY(ctdb, c);
81
82         c->key = call->key;
83         c->call_data = &call->call_data;
84         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85         c->record_data.dsize = data->dsize;
86         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
87         c->new_data = NULL;
88         c->reply_data = NULL;
89         c->status = 0;
90         c->header = header;
91
92         for (fn=ctdb_db->calls;fn;fn=fn->next) {
93                 if (fn->id == call->call_id) break;
94         }
95         if (fn == NULL) {
96                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
97                 talloc_free(c);
98                 return -1;
99         }
100
101         if (fn->fn(c) != 0) {
102                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
103                 talloc_free(c);
104                 return -1;
105         }
106
107         /* we need to force the record to be written out if this was a remote access */
108         if (header->laccessor != caller) {
109                 header->lacount = 0;
110         }
111         header->laccessor = caller;
112         header->lacount++;
113
114         /* we need to force the record to be written out if this was a remote access,
115            so that the lacount is updated */
116         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
117                 c->new_data = &c->record_data;
118         }
119
120         if (c->new_data && updatetdb) {
121                 /* XXX check that we always have the lock here? */
122                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
123                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
124                         talloc_free(c);
125                         return -1;
126                 }
127         }
128
129         if (c->reply_data) {
130                 call->reply_data = *c->reply_data;
131
132                 talloc_steal(call, call->reply_data.dptr);
133                 talloc_set_name_const(call->reply_data.dptr, __location__);
134         } else {
135                 call->reply_data.dptr = NULL;
136                 call->reply_data.dsize = 0;
137         }
138         call->status = c->status;
139
140         talloc_free(c);
141
142         return 0;
143 }
144
145
146 /*
147   queue a packet for sending from client to daemon
148 */
149 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
150 {
151         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
152 }
153
154
155 /*
156   called when a CTDB_REPLY_CALL packet comes in in the client
157
158   This packet comes in response to a CTDB_REQ_CALL request packet. It
159   contains any reply data from the call
160 */
161 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
162 {
163         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
164         struct ctdb_client_call_state *state;
165
166         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
167         if (state == NULL) {
168                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
169                 return;
170         }
171
172         if (hdr->reqid != state->reqid) {
173                 /* we found a record  but it was the wrong one */
174                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
175                 return;
176         }
177
178         state->call->reply_data.dptr = c->data;
179         state->call->reply_data.dsize = c->datalen;
180         state->call->status = c->status;
181
182         talloc_steal(state, c);
183
184         state->state = CTDB_CALL_DONE;
185
186         if (state->async.fn) {
187                 state->async.fn(state);
188         }
189 }
190
191 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
192
193 /*
194   this is called in the client, when data comes in from the daemon
195  */
196 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
197 {
198         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
199         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
200         TALLOC_CTX *tmp_ctx;
201
202         /* place the packet as a child of a tmp_ctx. We then use
203            talloc_free() below to free it. If any of the calls want
204            to keep it, then they will steal it somewhere else, and the
205            talloc_free() will be a no-op */
206         tmp_ctx = talloc_new(ctdb);
207         talloc_steal(tmp_ctx, hdr);
208
209         if (cnt == 0) {
210                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
211                 exit(1);
212         }
213
214         if (cnt < sizeof(*hdr)) {
215                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
216                 goto done;
217         }
218         if (cnt != hdr->length) {
219                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
220                                (unsigned)hdr->length, (unsigned)cnt);
221                 goto done;
222         }
223
224         if (hdr->ctdb_magic != CTDB_MAGIC) {
225                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
226                 goto done;
227         }
228
229         if (hdr->ctdb_version != CTDB_VERSION) {
230                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
231                 goto done;
232         }
233
234         switch (hdr->operation) {
235         case CTDB_REPLY_CALL:
236                 ctdb_client_reply_call(ctdb, hdr);
237                 break;
238
239         case CTDB_REQ_MESSAGE:
240                 ctdb_request_message(ctdb, hdr);
241                 break;
242
243         case CTDB_REPLY_CONTROL:
244                 ctdb_client_reply_control(ctdb, hdr);
245                 break;
246
247         default:
248                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
249         }
250
251 done:
252         talloc_free(tmp_ctx);
253 }
254
255 /*
256   connect to a unix domain socket
257 */
258 int ctdb_socket_connect(struct ctdb_context *ctdb)
259 {
260         struct sockaddr_un addr;
261
262         memset(&addr, 0, sizeof(addr));
263         addr.sun_family = AF_UNIX;
264         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
265
266         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
267         if (ctdb->daemon.sd == -1) {
268                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
269                 return -1;
270         }
271
272         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
273                 close(ctdb->daemon.sd);
274                 ctdb->daemon.sd = -1;
275                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
276                 return -1;
277         }
278
279         set_nonblocking(ctdb->daemon.sd);
280         set_close_on_exec(ctdb->daemon.sd);
281         
282         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
283                                               CTDB_DS_ALIGNMENT, 
284                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
285         return 0;
286 }
287
288
289 struct ctdb_record_handle {
290         struct ctdb_db_context *ctdb_db;
291         TDB_DATA key;
292         TDB_DATA *data;
293         struct ctdb_ltdb_header header;
294 };
295
296
297 /*
298   make a recv call to the local ctdb daemon - called from client context
299
300   This is called when the program wants to wait for a ctdb_call to complete and get the 
301   results. This call will block unless the call has already completed.
302 */
303 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
304 {
305         if (state == NULL) {
306                 return -1;
307         }
308
309         while (state->state < CTDB_CALL_DONE) {
310                 event_loop_once(state->ctdb_db->ctdb->ev);
311         }
312         if (state->state != CTDB_CALL_DONE) {
313                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
314                 talloc_free(state);
315                 return -1;
316         }
317
318         if (state->call->reply_data.dsize) {
319                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
320                                                       state->call->reply_data.dptr,
321                                                       state->call->reply_data.dsize);
322                 call->reply_data.dsize = state->call->reply_data.dsize;
323         } else {
324                 call->reply_data.dptr = NULL;
325                 call->reply_data.dsize = 0;
326         }
327         call->status = state->call->status;
328         talloc_free(state);
329
330         return call->status;
331 }
332
333
334
335
336 /*
337   destroy a ctdb_call in client
338 */
339 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
340 {
341         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
342         return 0;
343 }
344
345 /*
346   construct an event driven local ctdb_call
347
348   this is used so that locally processed ctdb_call requests are processed
349   in an event driven manner
350 */
351 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
352                                                                   struct ctdb_call *call,
353                                                                   struct ctdb_ltdb_header *header,
354                                                                   TDB_DATA *data)
355 {
356         struct ctdb_client_call_state *state;
357         struct ctdb_context *ctdb = ctdb_db->ctdb;
358         int ret;
359
360         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
361         CTDB_NO_MEMORY_NULL(ctdb, state);
362         state->call = talloc_zero(state, struct ctdb_call);
363         CTDB_NO_MEMORY_NULL(ctdb, state->call);
364
365         talloc_steal(state, data->dptr);
366
367         state->state   = CTDB_CALL_DONE;
368         *(state->call) = *call;
369         state->ctdb_db = ctdb_db;
370
371         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true, ctdb->pnn);
372         if (ret != 0) {
373                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
374         }
375
376         return state;
377 }
378
379 /*
380   make a ctdb call to the local daemon - async send. Called from client context.
381
382   This constructs a ctdb_call request and queues it for processing. 
383   This call never blocks.
384 */
385 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
386                                               struct ctdb_call *call)
387 {
388         struct ctdb_client_call_state *state;
389         struct ctdb_context *ctdb = ctdb_db->ctdb;
390         struct ctdb_ltdb_header header;
391         TDB_DATA data;
392         int ret;
393         size_t len;
394         struct ctdb_req_call *c;
395
396         /* if the domain socket is not yet open, open it */
397         if (ctdb->daemon.sd==-1) {
398                 ctdb_socket_connect(ctdb);
399         }
400
401         ret = ctdb_ltdb_lock(ctdb_db, call->key);
402         if (ret != 0) {
403                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
404                 return NULL;
405         }
406
407         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
408
409         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
410                 ret = -1;
411         }
412
413         if (ret == 0 && header.dmaster == ctdb->pnn) {
414                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
415                 talloc_free(data.dptr);
416                 ctdb_ltdb_unlock(ctdb_db, call->key);
417                 return state;
418         }
419
420         ctdb_ltdb_unlock(ctdb_db, call->key);
421         talloc_free(data.dptr);
422
423         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
424         if (state == NULL) {
425                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
426                 return NULL;
427         }
428         state->call = talloc_zero(state, struct ctdb_call);
429         if (state->call == NULL) {
430                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
431                 return NULL;
432         }
433
434         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
435         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
436         if (c == NULL) {
437                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
438                 return NULL;
439         }
440
441         state->reqid     = ctdb_reqid_new(ctdb, state);
442         state->ctdb_db = ctdb_db;
443         talloc_set_destructor(state, ctdb_client_call_destructor);
444
445         c->hdr.reqid     = state->reqid;
446         c->flags         = call->flags;
447         c->db_id         = ctdb_db->db_id;
448         c->callid        = call->call_id;
449         c->hopcount      = 0;
450         c->keylen        = call->key.dsize;
451         c->calldatalen   = call->call_data.dsize;
452         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
453         memcpy(&c->data[call->key.dsize], 
454                call->call_data.dptr, call->call_data.dsize);
455         *(state->call)              = *call;
456         state->call->call_data.dptr = &c->data[call->key.dsize];
457         state->call->key.dptr       = &c->data[0];
458
459         state->state  = CTDB_CALL_WAIT;
460
461
462         ctdb_client_queue_pkt(ctdb, &c->hdr);
463
464         return state;
465 }
466
467
468 /*
469   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
470 */
471 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
472 {
473         struct ctdb_client_call_state *state;
474
475         state = ctdb_call_send(ctdb_db, call);
476         return ctdb_call_recv(state, call);
477 }
478
479
480 /*
481   tell the daemon what messaging srvid we will use, and register the message
482   handler function in the client
483 */
484 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
485                              ctdb_msg_fn_t handler,
486                              void *private_data)
487                                     
488 {
489         int res;
490         int32_t status;
491         
492         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
493                            tdb_null, NULL, NULL, &status, NULL, NULL);
494         if (res != 0 || status != 0) {
495                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
496                 return -1;
497         }
498
499         /* also need to register the handler with our own ctdb structure */
500         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
501 }
502
503 /*
504   tell the daemon we no longer want a srvid
505 */
506 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
507 {
508         int res;
509         int32_t status;
510         
511         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
512                            tdb_null, NULL, NULL, &status, NULL, NULL);
513         if (res != 0 || status != 0) {
514                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
515                 return -1;
516         }
517
518         /* also need to register the handler with our own ctdb structure */
519         ctdb_deregister_message_handler(ctdb, srvid, private_data);
520         return 0;
521 }
522
523
524 /*
525   send a message - from client context
526  */
527 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
528                       uint64_t srvid, TDB_DATA data)
529 {
530         struct ctdb_req_message *r;
531         int len, res;
532
533         len = offsetof(struct ctdb_req_message, data) + data.dsize;
534         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
535                                len, struct ctdb_req_message);
536         CTDB_NO_MEMORY(ctdb, r);
537
538         r->hdr.destnode  = pnn;
539         r->srvid         = srvid;
540         r->datalen       = data.dsize;
541         memcpy(&r->data[0], data.dptr, data.dsize);
542         
543         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
544         if (res != 0) {
545                 return res;
546         }
547
548         talloc_free(r);
549         return 0;
550 }
551
552
553 /*
554   cancel a ctdb_fetch_lock operation, releasing the lock
555  */
556 static int fetch_lock_destructor(struct ctdb_record_handle *h)
557 {
558         ctdb_ltdb_unlock(h->ctdb_db, h->key);
559         return 0;
560 }
561
562 /*
563   force the migration of a record to this node
564  */
565 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
566 {
567         struct ctdb_call call;
568         ZERO_STRUCT(call);
569         call.call_id = CTDB_NULL_FUNC;
570         call.key = key;
571         call.flags = CTDB_IMMEDIATE_MIGRATION;
572         return ctdb_call(ctdb_db, &call);
573 }
574
575 /*
576   try to fetch a readonly copy of a record
577  */
578 static int
579 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
580 {
581         int ret;
582
583         struct ctdb_call call;
584         ZERO_STRUCT(call);
585
586         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
587         call.call_data.dptr = NULL;
588         call.call_data.dsize = 0;
589         call.key = key;
590         call.flags = CTDB_WANT_READONLY;
591         ret = ctdb_call(ctdb_db, &call);
592
593         if (ret != 0) {
594                 return -1;
595         }
596         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
597                 return -1;
598         }
599
600         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
601         if (*hdr == NULL) {
602                 talloc_free(call.reply_data.dptr);
603                 return -1;
604         }
605
606         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
607         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
608         if (data->dptr == NULL) {
609                 talloc_free(call.reply_data.dptr);
610                 talloc_free(hdr);
611                 return -1;
612         }
613
614         return 0;
615 }
616
617 /*
618   get a lock on a record, and return the records data. Blocks until it gets the lock
619  */
620 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
621                                            TDB_DATA key, TDB_DATA *data)
622 {
623         int ret;
624         struct ctdb_record_handle *h;
625
626         /*
627           procedure is as follows:
628
629           1) get the chain lock. 
630           2) check if we are dmaster
631           3) if we are the dmaster then return handle 
632           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
633              reply from ctdbd
634           5) when we get the reply, goto (1)
635          */
636
637         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
638         if (h == NULL) {
639                 return NULL;
640         }
641
642         h->ctdb_db = ctdb_db;
643         h->key     = key;
644         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
645         if (h->key.dptr == NULL) {
646                 talloc_free(h);
647                 return NULL;
648         }
649         h->data    = data;
650
651         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
652                  (const char *)key.dptr));
653
654 again:
655         /* step 1 - get the chain lock */
656         ret = ctdb_ltdb_lock(ctdb_db, key);
657         if (ret != 0) {
658                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
659                 talloc_free(h);
660                 return NULL;
661         }
662
663         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
664
665         talloc_set_destructor(h, fetch_lock_destructor);
666
667         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
668
669         /* when torturing, ensure we test the remote path */
670         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
671             random() % 5 == 0) {
672                 h->header.dmaster = (uint32_t)-1;
673         }
674
675
676         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
677
678         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
679                 ctdb_ltdb_unlock(ctdb_db, key);
680                 ret = ctdb_client_force_migration(ctdb_db, key);
681                 if (ret != 0) {
682                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
683                         talloc_free(h);
684                         return NULL;
685                 }
686                 goto again;
687         }
688
689         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
690         return h;
691 }
692
693 /*
694   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
695  */
696 struct ctdb_record_handle *
697 ctdb_fetch_readonly_lock(
698         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
699         TDB_DATA key, TDB_DATA *data,
700         int read_only)
701 {
702         int ret;
703         struct ctdb_record_handle *h;
704         struct ctdb_ltdb_header *roheader = NULL;
705
706         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
707         if (h == NULL) {
708                 return NULL;
709         }
710
711         h->ctdb_db = ctdb_db;
712         h->key     = key;
713         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
714         if (h->key.dptr == NULL) {
715                 talloc_free(h);
716                 return NULL;
717         }
718         h->data    = data;
719
720         data->dptr = NULL;
721         data->dsize = 0;
722
723
724 again:
725         talloc_free(roheader);
726         roheader = NULL;
727
728         talloc_free(data->dptr);
729         data->dptr = NULL;
730         data->dsize = 0;
731
732         /* Lock the record/chain */
733         ret = ctdb_ltdb_lock(ctdb_db, key);
734         if (ret != 0) {
735                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
736                 talloc_free(h);
737                 return NULL;
738         }
739
740         talloc_set_destructor(h, fetch_lock_destructor);
741
742         /* Check if record exists yet in the TDB */
743         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
744         if (ret != 0) {
745                 ctdb_ltdb_unlock(ctdb_db, key);
746                 ret = ctdb_client_force_migration(ctdb_db, key);
747                 if (ret != 0) {
748                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
749                         talloc_free(h);
750                         return NULL;
751                 }
752                 goto again;
753         }
754
755         /* if this is a request for read/write and we have delegations
756            we have to revoke all delegations first
757         */
758         if ((read_only == 0) 
759         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
760         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
761                 ctdb_ltdb_unlock(ctdb_db, key);
762                 ret = ctdb_client_force_migration(ctdb_db, key);
763                 if (ret != 0) {
764                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
765                         talloc_free(h);
766                         return NULL;
767                 }
768                 goto again;
769         }
770
771         /* if we are dmaster, just return the handle */
772         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
773                 return h;
774         }
775
776         if (read_only != 0) {
777                 TDB_DATA rodata = {NULL, 0};
778
779                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
780                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
781                         return h;
782                 }
783
784                 ctdb_ltdb_unlock(ctdb_db, key);
785                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
786                 if (ret != 0) {
787                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
788                         ret = ctdb_client_force_migration(ctdb_db, key);
789                         if (ret != 0) {
790                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
791                                 talloc_free(h);
792                                 return NULL;
793                         }
794
795                         goto again;
796                 }
797
798                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
799                         ret = ctdb_client_force_migration(ctdb_db, key);
800                         if (ret != 0) {
801                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
802                                 talloc_free(h);
803                                 return NULL;
804                         }
805
806                         goto again;
807                 }
808
809                 ret = ctdb_ltdb_lock(ctdb_db, key);
810                 if (ret != 0) {
811                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
812                         talloc_free(h);
813                         return NULL;
814                 }
815
816                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
817                 if (ret != 0) {
818                         ctdb_ltdb_unlock(ctdb_db, key);
819
820                         ret = ctdb_client_force_migration(ctdb_db, key);
821                         if (ret != 0) {
822                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
823                                 talloc_free(h);
824                                 return NULL;
825                         }
826
827                         goto again;
828                 }
829
830                 return h;
831         }
832
833         /* we are not dmaster and this was not a request for a readonly lock
834          * so unlock the record, migrate it and try again
835          */
836         ctdb_ltdb_unlock(ctdb_db, key);
837         ret = ctdb_client_force_migration(ctdb_db, key);
838         if (ret != 0) {
839                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
840                 talloc_free(h);
841                 return NULL;
842         }
843         goto again;
844 }
845
846 /*
847   store some data to the record that was locked with ctdb_fetch_lock()
848 */
849 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
850 {
851         if (h->ctdb_db->persistent) {
852                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
853                 return -1;
854         }
855
856         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
857 }
858
859 /*
860   non-locking fetch of a record
861  */
862 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
863                TDB_DATA key, TDB_DATA *data)
864 {
865         struct ctdb_call call;
866         int ret;
867
868         call.call_id = CTDB_FETCH_FUNC;
869         call.call_data.dptr = NULL;
870         call.call_data.dsize = 0;
871         call.key = key;
872
873         ret = ctdb_call(ctdb_db, &call);
874
875         if (ret == 0) {
876                 *data = call.reply_data;
877                 talloc_steal(mem_ctx, data->dptr);
878         }
879
880         return ret;
881 }
882
883
884
885 /*
886    called when a control completes or timesout to invoke the callback
887    function the user provided
888 */
889 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
890         struct timeval t, void *private_data)
891 {
892         struct ctdb_client_control_state *state;
893         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
894         int ret;
895
896         state = talloc_get_type(private_data, struct ctdb_client_control_state);
897         talloc_steal(tmp_ctx, state);
898
899         ret = ctdb_control_recv(state->ctdb, state, state,
900                         NULL, 
901                         NULL, 
902                         NULL);
903         if (ret != 0) {
904                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
905         }
906
907         talloc_free(tmp_ctx);
908 }
909
910 /*
911   called when a CTDB_REPLY_CONTROL packet comes in in the client
912
913   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
914   contains any reply data from the control
915 */
916 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
917                                       struct ctdb_req_header *hdr)
918 {
919         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
920         struct ctdb_client_control_state *state;
921
922         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
923         if (state == NULL) {
924                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
925                 return;
926         }
927
928         if (hdr->reqid != state->reqid) {
929                 /* we found a record  but it was the wrong one */
930                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
931                 return;
932         }
933
934         state->outdata.dptr = c->data;
935         state->outdata.dsize = c->datalen;
936         state->status = c->status;
937         if (c->errorlen) {
938                 state->errormsg = talloc_strndup(state, 
939                                                  (char *)&c->data[c->datalen], 
940                                                  c->errorlen);
941         }
942
943         /* state->outdata now uses resources from c so we dont want c
944            to just dissappear from under us while state is still alive
945         */
946         talloc_steal(state, c);
947
948         state->state = CTDB_CONTROL_DONE;
949
950         /* if we had a callback registered for this control, pull the response
951            and call the callback.
952         */
953         if (state->async.fn) {
954                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
955         }
956 }
957
958
959 /*
960   destroy a ctdb_control in client
961 */
962 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
963 {
964         ctdb_reqid_remove(state->ctdb, state->reqid);
965         return 0;
966 }
967
968
969 /* time out handler for ctdb_control */
970 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
971         struct timeval t, void *private_data)
972 {
973         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
974
975         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
976                          "dstnode:%u\n", state->reqid, state->c->opcode,
977                          state->c->hdr.destnode));
978
979         state->state = CTDB_CONTROL_TIMEOUT;
980
981         /* if we had a callback registered for this control, pull the response
982            and call the callback.
983         */
984         if (state->async.fn) {
985                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
986         }
987 }
988
989 /* async version of send control request */
990 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
991                 uint32_t destnode, uint64_t srvid, 
992                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
993                 TALLOC_CTX *mem_ctx,
994                 struct timeval *timeout,
995                 char **errormsg)
996 {
997         struct ctdb_client_control_state *state;
998         size_t len;
999         struct ctdb_req_control *c;
1000         int ret;
1001
1002         if (errormsg) {
1003                 *errormsg = NULL;
1004         }
1005
1006         /* if the domain socket is not yet open, open it */
1007         if (ctdb->daemon.sd==-1) {
1008                 ctdb_socket_connect(ctdb);
1009         }
1010
1011         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1012         CTDB_NO_MEMORY_NULL(ctdb, state);
1013
1014         state->ctdb       = ctdb;
1015         state->reqid      = ctdb_reqid_new(ctdb, state);
1016         state->state      = CTDB_CONTROL_WAIT;
1017         state->errormsg   = NULL;
1018
1019         talloc_set_destructor(state, ctdb_client_control_destructor);
1020
1021         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1022         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1023                                len, struct ctdb_req_control);
1024         state->c            = c;        
1025         CTDB_NO_MEMORY_NULL(ctdb, c);
1026         c->hdr.reqid        = state->reqid;
1027         c->hdr.destnode     = destnode;
1028         c->opcode           = opcode;
1029         c->client_id        = 0;
1030         c->flags            = flags;
1031         c->srvid            = srvid;
1032         c->datalen          = data.dsize;
1033         if (data.dsize) {
1034                 memcpy(&c->data[0], data.dptr, data.dsize);
1035         }
1036
1037         /* timeout */
1038         if (timeout && !timeval_is_zero(timeout)) {
1039                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1040         }
1041
1042         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1043         if (ret != 0) {
1044                 talloc_free(state);
1045                 return NULL;
1046         }
1047
1048         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1049                 talloc_free(state);
1050                 return NULL;
1051         }
1052
1053         return state;
1054 }
1055
1056
1057 /* async version of receive control reply */
1058 int ctdb_control_recv(struct ctdb_context *ctdb, 
1059                 struct ctdb_client_control_state *state, 
1060                 TALLOC_CTX *mem_ctx,
1061                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1062 {
1063         TALLOC_CTX *tmp_ctx;
1064
1065         if (status != NULL) {
1066                 *status = -1;
1067         }
1068         if (errormsg != NULL) {
1069                 *errormsg = NULL;
1070         }
1071
1072         if (state == NULL) {
1073                 return -1;
1074         }
1075
1076         /* prevent double free of state */
1077         tmp_ctx = talloc_new(ctdb);
1078         talloc_steal(tmp_ctx, state);
1079
1080         /* loop one event at a time until we either timeout or the control
1081            completes.
1082         */
1083         while (state->state == CTDB_CONTROL_WAIT) {
1084                 event_loop_once(ctdb->ev);
1085         }
1086
1087         if (state->state != CTDB_CONTROL_DONE) {
1088                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1089                 if (state->async.fn) {
1090                         state->async.fn(state);
1091                 }
1092                 talloc_free(tmp_ctx);
1093                 return -1;
1094         }
1095
1096         if (state->errormsg) {
1097                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1098                 if (errormsg) {
1099                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1100                 }
1101                 if (state->async.fn) {
1102                         state->async.fn(state);
1103                 }
1104                 talloc_free(tmp_ctx);
1105                 return -1;
1106         }
1107
1108         if (outdata) {
1109                 *outdata = state->outdata;
1110                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1111         }
1112
1113         if (status) {
1114                 *status = state->status;
1115         }
1116
1117         if (state->async.fn) {
1118                 state->async.fn(state);
1119         }
1120
1121         talloc_free(tmp_ctx);
1122         return 0;
1123 }
1124
1125
1126
1127 /*
1128   send a ctdb control message
1129   timeout specifies how long we should wait for a reply.
1130   if timeout is NULL we wait indefinitely
1131  */
1132 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1133                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1134                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1135                  struct timeval *timeout,
1136                  char **errormsg)
1137 {
1138         struct ctdb_client_control_state *state;
1139
1140         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1141                         flags, data, mem_ctx,
1142                         timeout, errormsg);
1143
1144         /* FIXME: Error conditions in ctdb_control_send return NULL without
1145          * setting errormsg.  So, there is no way to distinguish between sucess
1146          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1147         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1148                 if (status != NULL) {
1149                         *status = 0;
1150                 }
1151                 return 0;
1152         }
1153
1154         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1155                         errormsg);
1156 }
1157
1158
1159
1160
1161 /*
1162   a process exists call. Returns 0 if process exists, -1 otherwise
1163  */
1164 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1165 {
1166         int ret;
1167         TDB_DATA data;
1168         int32_t status;
1169
1170         data.dptr = (uint8_t*)&pid;
1171         data.dsize = sizeof(pid);
1172
1173         ret = ctdb_control(ctdb, destnode, 0, 
1174                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1175                            NULL, NULL, &status, NULL, NULL);
1176         if (ret != 0) {
1177                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1178                 return -1;
1179         }
1180
1181         return status;
1182 }
1183
1184 /*
1185   get remote statistics
1186  */
1187 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1188 {
1189         int ret;
1190         TDB_DATA data;
1191         int32_t res;
1192
1193         ret = ctdb_control(ctdb, destnode, 0, 
1194                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1195                            ctdb, &data, &res, NULL, NULL);
1196         if (ret != 0 || res != 0) {
1197                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1198                 return -1;
1199         }
1200
1201         if (data.dsize != sizeof(struct ctdb_statistics)) {
1202                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1203                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1204                       return -1;
1205         }
1206
1207         *status = *(struct ctdb_statistics *)data.dptr;
1208         talloc_free(data.dptr);
1209                         
1210         return 0;
1211 }
1212
1213 /*
1214   shutdown a remote ctdb node
1215  */
1216 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1217 {
1218         struct ctdb_client_control_state *state;
1219
1220         state = ctdb_control_send(ctdb, destnode, 0, 
1221                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1222                            NULL, &timeout, NULL);
1223         if (state == NULL) {
1224                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1225                 return -1;
1226         }
1227
1228         return 0;
1229 }
1230
1231 /*
1232   get vnn map from a remote node
1233  */
1234 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1235 {
1236         int ret;
1237         TDB_DATA outdata;
1238         int32_t res;
1239         struct ctdb_vnn_map_wire *map;
1240
1241         ret = ctdb_control(ctdb, destnode, 0, 
1242                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1243                            mem_ctx, &outdata, &res, &timeout, NULL);
1244         if (ret != 0 || res != 0) {
1245                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1246                 return -1;
1247         }
1248         
1249         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1250         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1251             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1252                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1253                 return -1;
1254         }
1255
1256         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1257         CTDB_NO_MEMORY(ctdb, *vnnmap);
1258         (*vnnmap)->generation = map->generation;
1259         (*vnnmap)->size       = map->size;
1260         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1261
1262         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1263         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1264         talloc_free(outdata.dptr);
1265                     
1266         return 0;
1267 }
1268
1269
1270 /*
1271   get the recovery mode of a remote node
1272  */
1273 struct ctdb_client_control_state *
1274 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1275 {
1276         return ctdb_control_send(ctdb, destnode, 0, 
1277                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1278                            mem_ctx, &timeout, NULL);
1279 }
1280
1281 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1282 {
1283         int ret;
1284         int32_t res;
1285
1286         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1287         if (ret != 0) {
1288                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1289                 return -1;
1290         }
1291
1292         if (recmode) {
1293                 *recmode = (uint32_t)res;
1294         }
1295
1296         return 0;
1297 }
1298
1299 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1300 {
1301         struct ctdb_client_control_state *state;
1302
1303         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1304         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1305 }
1306
1307
1308
1309
1310 /*
1311   set the recovery mode of a remote node
1312  */
1313 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1314 {
1315         int ret;
1316         TDB_DATA data;
1317         int32_t res;
1318
1319         data.dsize = sizeof(uint32_t);
1320         data.dptr = (unsigned char *)&recmode;
1321
1322         ret = ctdb_control(ctdb, destnode, 0, 
1323                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1324                            NULL, NULL, &res, &timeout, NULL);
1325         if (ret != 0 || res != 0) {
1326                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1327                 return -1;
1328         }
1329
1330         return 0;
1331 }
1332
1333
1334
1335 /*
1336   get the recovery master of a remote node
1337  */
1338 struct ctdb_client_control_state *
1339 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1340                         struct timeval timeout, uint32_t destnode)
1341 {
1342         return ctdb_control_send(ctdb, destnode, 0, 
1343                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1344                            mem_ctx, &timeout, NULL);
1345 }
1346
1347 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1348 {
1349         int ret;
1350         int32_t res;
1351
1352         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1353         if (ret != 0) {
1354                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1355                 return -1;
1356         }
1357
1358         if (recmaster) {
1359                 *recmaster = (uint32_t)res;
1360         }
1361
1362         return 0;
1363 }
1364
1365 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1366 {
1367         struct ctdb_client_control_state *state;
1368
1369         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1370         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1371 }
1372
1373
1374 /*
1375   set the recovery master of a remote node
1376  */
1377 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1378 {
1379         int ret;
1380         TDB_DATA data;
1381         int32_t res;
1382
1383         ZERO_STRUCT(data);
1384         data.dsize = sizeof(uint32_t);
1385         data.dptr = (unsigned char *)&recmaster;
1386
1387         ret = ctdb_control(ctdb, destnode, 0, 
1388                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1389                            NULL, NULL, &res, &timeout, NULL);
1390         if (ret != 0 || res != 0) {
1391                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1392                 return -1;
1393         }
1394
1395         return 0;
1396 }
1397
1398
1399 /*
1400   get a list of databases off a remote node
1401  */
1402 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1403                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1404 {
1405         int ret;
1406         TDB_DATA outdata;
1407         int32_t res;
1408
1409         ret = ctdb_control(ctdb, destnode, 0, 
1410                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1411                            mem_ctx, &outdata, &res, &timeout, NULL);
1412         if (ret != 0 || res != 0) {
1413                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1414                 return -1;
1415         }
1416
1417         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1418         talloc_free(outdata.dptr);
1419                     
1420         return 0;
1421 }
1422
1423 /*
1424   get a list of nodes (vnn and flags ) from a remote node
1425  */
1426 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1427                 struct timeval timeout, uint32_t destnode, 
1428                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1429 {
1430         int ret;
1431         TDB_DATA outdata;
1432         int32_t res;
1433
1434         ret = ctdb_control(ctdb, destnode, 0, 
1435                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1436                            mem_ctx, &outdata, &res, &timeout, NULL);
1437         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1438                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1439                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1440         }
1441         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1442                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1443                 return -1;
1444         }
1445
1446         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1447         talloc_free(outdata.dptr);
1448                     
1449         return 0;
1450 }
1451
1452 /*
1453   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1454  */
1455 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1456                 struct timeval timeout, uint32_t destnode, 
1457                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1458 {
1459         int ret, i, len;
1460         TDB_DATA outdata;
1461         struct ctdb_node_mapv4 *nodemapv4;
1462         int32_t res;
1463
1464         ret = ctdb_control(ctdb, destnode, 0, 
1465                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1466                            mem_ctx, &outdata, &res, &timeout, NULL);
1467         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1468                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1469                 return -1;
1470         }
1471
1472         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1473
1474         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1475         (*nodemap) = talloc_zero_size(mem_ctx, len);
1476         CTDB_NO_MEMORY(ctdb, (*nodemap));
1477
1478         (*nodemap)->num = nodemapv4->num;
1479         for (i=0; i<nodemapv4->num; i++) {
1480                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1481                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1482                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1483                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1484         }
1485                 
1486         talloc_free(outdata.dptr);
1487                     
1488         return 0;
1489 }
1490
1491 /*
1492   drop the transport, reload the nodes file and restart the transport
1493  */
1494 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1495                     struct timeval timeout, uint32_t destnode)
1496 {
1497         int ret;
1498         int32_t res;
1499
1500         ret = ctdb_control(ctdb, destnode, 0, 
1501                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1502                            NULL, NULL, &res, &timeout, NULL);
1503         if (ret != 0 || res != 0) {
1504                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1505                 return -1;
1506         }
1507
1508         return 0;
1509 }
1510
1511
1512 /*
1513   set vnn map on a node
1514  */
1515 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1516                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1517 {
1518         int ret;
1519         TDB_DATA data;
1520         int32_t res;
1521         struct ctdb_vnn_map_wire *map;
1522         size_t len;
1523
1524         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1525         map = talloc_size(mem_ctx, len);
1526         CTDB_NO_MEMORY(ctdb, map);
1527
1528         map->generation = vnnmap->generation;
1529         map->size = vnnmap->size;
1530         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1531         
1532         data.dsize = len;
1533         data.dptr  = (uint8_t *)map;
1534
1535         ret = ctdb_control(ctdb, destnode, 0, 
1536                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1537                            NULL, NULL, &res, &timeout, NULL);
1538         if (ret != 0 || res != 0) {
1539                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1540                 return -1;
1541         }
1542
1543         talloc_free(map);
1544
1545         return 0;
1546 }
1547
1548
1549 /*
1550   async send for pull database
1551  */
1552 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1553         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1554         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1555 {
1556         TDB_DATA indata;
1557         struct ctdb_control_pulldb *pull;
1558         struct ctdb_client_control_state *state;
1559
1560         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1561         CTDB_NO_MEMORY_NULL(ctdb, pull);
1562
1563         pull->db_id   = dbid;
1564         pull->lmaster = lmaster;
1565
1566         indata.dsize = sizeof(struct ctdb_control_pulldb);
1567         indata.dptr  = (unsigned char *)pull;
1568
1569         state = ctdb_control_send(ctdb, destnode, 0, 
1570                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1571                                   mem_ctx, &timeout, NULL);
1572         talloc_free(pull);
1573
1574         return state;
1575 }
1576
1577 /*
1578   async recv for pull database
1579  */
1580 int ctdb_ctrl_pulldb_recv(
1581         struct ctdb_context *ctdb, 
1582         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1583         TDB_DATA *outdata)
1584 {
1585         int ret;
1586         int32_t res;
1587
1588         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1589         if ( (ret != 0) || (res != 0) ){
1590                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1591                 return -1;
1592         }
1593
1594         return 0;
1595 }
1596
1597 /*
1598   pull all keys and records for a specific database on a node
1599  */
1600 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1601                 uint32_t dbid, uint32_t lmaster, 
1602                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1603                 TDB_DATA *outdata)
1604 {
1605         struct ctdb_client_control_state *state;
1606
1607         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1608                                       timeout);
1609         
1610         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1611 }
1612
1613
1614 /*
1615   change dmaster for all keys in the database to the new value
1616  */
1617 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1618                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1619 {
1620         int ret;
1621         TDB_DATA indata;
1622         int32_t res;
1623
1624         indata.dsize = 2*sizeof(uint32_t);
1625         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1626
1627         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1628         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1629
1630         ret = ctdb_control(ctdb, destnode, 0, 
1631                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1632                            NULL, NULL, &res, &timeout, NULL);
1633         if (ret != 0 || res != 0) {
1634                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1635                 return -1;
1636         }
1637
1638         return 0;
1639 }
1640
1641 /*
1642   ping a node, return number of clients connected
1643  */
1644 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1645 {
1646         int ret;
1647         int32_t res;
1648
1649         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1650                            tdb_null, NULL, NULL, &res, NULL, NULL);
1651         if (ret != 0) {
1652                 return -1;
1653         }
1654         return res;
1655 }
1656
1657 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1658                            struct timeval timeout, 
1659                            uint32_t destnode,
1660                            uint32_t *runstate)
1661 {
1662         TDB_DATA outdata;
1663         int32_t res;
1664         int ret;
1665
1666         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1667                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1668         if (ret != 0 || res != 0) {
1669                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1670                 return ret != 0 ? ret : res;
1671         }
1672
1673         if (outdata.dsize != sizeof(uint32_t)) {
1674                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1675                 talloc_free(outdata.dptr);
1676                 return -1;
1677         }
1678
1679         if (runstate != NULL) {
1680                 *runstate = *(uint32_t *)outdata.dptr;
1681         }
1682         talloc_free(outdata.dptr);
1683
1684         return 0;
1685 }
1686
1687 /*
1688   find the real path to a ltdb 
1689  */
1690 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1691                    const char **path)
1692 {
1693         int ret;
1694         int32_t res;
1695         TDB_DATA data;
1696
1697         data.dptr = (uint8_t *)&dbid;
1698         data.dsize = sizeof(dbid);
1699
1700         ret = ctdb_control(ctdb, destnode, 0, 
1701                            CTDB_CONTROL_GETDBPATH, 0, data, 
1702                            mem_ctx, &data, &res, &timeout, NULL);
1703         if (ret != 0 || res != 0) {
1704                 return -1;
1705         }
1706
1707         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1708         if ((*path) == NULL) {
1709                 return -1;
1710         }
1711
1712         talloc_free(data.dptr);
1713
1714         return 0;
1715 }
1716
1717 /*
1718   find the name of a db 
1719  */
1720 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1721                    const char **name)
1722 {
1723         int ret;
1724         int32_t res;
1725         TDB_DATA data;
1726
1727         data.dptr = (uint8_t *)&dbid;
1728         data.dsize = sizeof(dbid);
1729
1730         ret = ctdb_control(ctdb, destnode, 0, 
1731                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1732                            mem_ctx, &data, &res, &timeout, NULL);
1733         if (ret != 0 || res != 0) {
1734                 return -1;
1735         }
1736
1737         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1738         if ((*name) == NULL) {
1739                 return -1;
1740         }
1741
1742         talloc_free(data.dptr);
1743
1744         return 0;
1745 }
1746
1747 /*
1748   get the health status of a db
1749  */
1750 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1751                           struct timeval timeout,
1752                           uint32_t destnode,
1753                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1754                           const char **reason)
1755 {
1756         int ret;
1757         int32_t res;
1758         TDB_DATA data;
1759
1760         data.dptr = (uint8_t *)&dbid;
1761         data.dsize = sizeof(dbid);
1762
1763         ret = ctdb_control(ctdb, destnode, 0,
1764                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1765                            mem_ctx, &data, &res, &timeout, NULL);
1766         if (ret != 0 || res != 0) {
1767                 return -1;
1768         }
1769
1770         if (data.dsize == 0) {
1771                 (*reason) = NULL;
1772                 return 0;
1773         }
1774
1775         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1776         if ((*reason) == NULL) {
1777                 return -1;
1778         }
1779
1780         talloc_free(data.dptr);
1781
1782         return 0;
1783 }
1784
1785 /*
1786   create a database
1787  */
1788 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1789                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1790 {
1791         int ret;
1792         int32_t res;
1793         TDB_DATA data;
1794
1795         data.dptr = discard_const(name);
1796         data.dsize = strlen(name)+1;
1797
1798         ret = ctdb_control(ctdb, destnode, 0, 
1799                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1800                            0, data, 
1801                            mem_ctx, &data, &res, &timeout, NULL);
1802
1803         if (ret != 0 || res != 0) {
1804                 return -1;
1805         }
1806
1807         return 0;
1808 }
1809
1810 /*
1811   get debug level on a node
1812  */
1813 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1814 {
1815         int ret;
1816         int32_t res;
1817         TDB_DATA data;
1818
1819         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1820                            ctdb, &data, &res, NULL, NULL);
1821         if (ret != 0 || res != 0) {
1822                 return -1;
1823         }
1824         if (data.dsize != sizeof(int32_t)) {
1825                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1826                          (unsigned)data.dsize));
1827                 return -1;
1828         }
1829         *level = *(int32_t *)data.dptr;
1830         talloc_free(data.dptr);
1831         return 0;
1832 }
1833
1834 /*
1835   set debug level on a node
1836  */
1837 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1838 {
1839         int ret;
1840         int32_t res;
1841         TDB_DATA data;
1842
1843         data.dptr = (uint8_t *)&level;
1844         data.dsize = sizeof(level);
1845
1846         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1847                            NULL, NULL, &res, NULL, NULL);
1848         if (ret != 0 || res != 0) {
1849                 return -1;
1850         }
1851         return 0;
1852 }
1853
1854
1855 /*
1856   get a list of connected nodes
1857  */
1858 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1859                                 struct timeval timeout,
1860                                 TALLOC_CTX *mem_ctx,
1861                                 uint32_t *num_nodes)
1862 {
1863         struct ctdb_node_map *map=NULL;
1864         int ret, i;
1865         uint32_t *nodes;
1866
1867         *num_nodes = 0;
1868
1869         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1870         if (ret != 0) {
1871                 return NULL;
1872         }
1873
1874         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1875         if (nodes == NULL) {
1876                 return NULL;
1877         }
1878
1879         for (i=0;i<map->num;i++) {
1880                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1881                         nodes[*num_nodes] = map->nodes[i].pnn;
1882                         (*num_nodes)++;
1883                 }
1884         }
1885
1886         return nodes;
1887 }
1888
1889
1890 /*
1891   reset remote status
1892  */
1893 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1894 {
1895         int ret;
1896         int32_t res;
1897
1898         ret = ctdb_control(ctdb, destnode, 0, 
1899                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1900                            NULL, NULL, &res, NULL, NULL);
1901         if (ret != 0 || res != 0) {
1902                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1903                 return -1;
1904         }
1905         return 0;
1906 }
1907
1908 /*
1909   attach to a specific database - client call
1910 */
1911 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
1912                                     struct timeval timeout,
1913                                     const char *name,
1914                                     bool persistent,
1915                                     uint32_t tdb_flags)
1916 {
1917         struct ctdb_db_context *ctdb_db;
1918         TDB_DATA data;
1919         int ret;
1920         int32_t res;
1921
1922         ctdb_db = ctdb_db_handle(ctdb, name);
1923         if (ctdb_db) {
1924                 return ctdb_db;
1925         }
1926
1927         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1928         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1929
1930         ctdb_db->ctdb = ctdb;
1931         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1932         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1933
1934         data.dptr = discard_const(name);
1935         data.dsize = strlen(name)+1;
1936
1937         /* tell ctdb daemon to attach */
1938         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1939                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1940                            0, data, ctdb_db, &data, &res, NULL, NULL);
1941         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1942                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1943                 talloc_free(ctdb_db);
1944                 return NULL;
1945         }
1946         
1947         ctdb_db->db_id = *(uint32_t *)data.dptr;
1948         talloc_free(data.dptr);
1949
1950         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1951         if (ret != 0) {
1952                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1953                 talloc_free(ctdb_db);
1954                 return NULL;
1955         }
1956
1957         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1958         if (ctdb->valgrinding) {
1959                 tdb_flags |= TDB_NOMMAP;
1960         }
1961         tdb_flags |= TDB_DISALLOW_NESTING;
1962
1963         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1964         if (ctdb_db->ltdb == NULL) {
1965                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1966                 talloc_free(ctdb_db);
1967                 return NULL;
1968         }
1969
1970         ctdb_db->persistent = persistent;
1971
1972         DLIST_ADD(ctdb->db_list, ctdb_db);
1973
1974         /* add well known functions */
1975         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1976         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1977         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1978
1979         return ctdb_db;
1980 }
1981
1982
1983 /*
1984   setup a call for a database
1985  */
1986 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1987 {
1988         struct ctdb_registered_call *call;
1989
1990 #if 0
1991         TDB_DATA data;
1992         int32_t status;
1993         struct ctdb_control_set_call c;
1994         int ret;
1995
1996         /* this is no longer valid with the separate daemon architecture */
1997         c.db_id = ctdb_db->db_id;
1998         c.fn    = fn;
1999         c.id    = id;
2000
2001         data.dptr = (uint8_t *)&c;
2002         data.dsize = sizeof(c);
2003
2004         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
2005                            data, NULL, NULL, &status, NULL, NULL);
2006         if (ret != 0 || status != 0) {
2007                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
2008                 return -1;
2009         }
2010 #endif
2011
2012         /* also register locally */
2013         call = talloc(ctdb_db, struct ctdb_registered_call);
2014         call->fn = fn;
2015         call->id = id;
2016
2017         DLIST_ADD(ctdb_db->calls, call);        
2018         return 0;
2019 }
2020
2021
2022 struct traverse_state {
2023         bool done;
2024         uint32_t count;
2025         ctdb_traverse_func fn;
2026         void *private_data;
2027         bool listemptyrecords;
2028 };
2029
2030 /*
2031   called on each key during a ctdb_traverse
2032  */
2033 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2034 {
2035         struct traverse_state *state = (struct traverse_state *)p;
2036         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2037         TDB_DATA key;
2038
2039         if (data.dsize < sizeof(uint32_t) ||
2040             d->length != data.dsize) {
2041                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2042                 state->done = true;
2043                 return;
2044         }
2045
2046         key.dsize = d->keylen;
2047         key.dptr  = &d->data[0];
2048         data.dsize = d->datalen;
2049         data.dptr = &d->data[d->keylen];
2050
2051         if (key.dsize == 0 && data.dsize == 0) {
2052                 /* end of traverse */
2053                 state->done = true;
2054                 return;
2055         }
2056
2057         if (!state->listemptyrecords &&
2058             data.dsize == sizeof(struct ctdb_ltdb_header))
2059         {
2060                 /* empty records are deleted records in ctdb */
2061                 return;
2062         }
2063
2064         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2065                 state->done = true;
2066         }
2067
2068         state->count++;
2069 }
2070
2071 /**
2072  * start a cluster wide traverse, calling the supplied fn on each record
2073  * return the number of records traversed, or -1 on error
2074  *
2075  * Extendet variant with a flag to signal whether empty records should
2076  * be listed.
2077  */
2078 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2079                              ctdb_traverse_func fn,
2080                              bool withemptyrecords,
2081                              void *private_data)
2082 {
2083         TDB_DATA data;
2084         struct ctdb_traverse_start_ext t;
2085         int32_t status;
2086         int ret;
2087         uint64_t srvid = (getpid() | 0xFLL<<60);
2088         struct traverse_state state;
2089
2090         state.done = false;
2091         state.count = 0;
2092         state.private_data = private_data;
2093         state.fn = fn;
2094         state.listemptyrecords = withemptyrecords;
2095
2096         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2097         if (ret != 0) {
2098                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2099                 return -1;
2100         }
2101
2102         t.db_id = ctdb_db->db_id;
2103         t.srvid = srvid;
2104         t.reqid = 0;
2105         t.withemptyrecords = withemptyrecords;
2106
2107         data.dptr = (uint8_t *)&t;
2108         data.dsize = sizeof(t);
2109
2110         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2111                            data, NULL, NULL, &status, NULL, NULL);
2112         if (ret != 0 || status != 0) {
2113                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2114                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2115                 return -1;
2116         }
2117
2118         while (!state.done) {
2119                 event_loop_once(ctdb_db->ctdb->ev);
2120         }
2121
2122         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2123         if (ret != 0) {
2124                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2125                 return -1;
2126         }
2127
2128         return state.count;
2129 }
2130
2131 /**
2132  * start a cluster wide traverse, calling the supplied fn on each record
2133  * return the number of records traversed, or -1 on error
2134  *
2135  * Standard version which does not list the empty records:
2136  * These are considered deleted.
2137  */
2138 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2139 {
2140         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2141 }
2142
2143 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2144 /*
2145   called on each key during a catdb
2146  */
2147 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2148 {
2149         int i;
2150         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2151         FILE *f = c->f;
2152         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2153
2154         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2155         for (i=0;i<key.dsize;i++) {
2156                 if (ISASCII(key.dptr[i])) {
2157                         fprintf(f, "%c", key.dptr[i]);
2158                 } else {
2159                         fprintf(f, "\\%02X", key.dptr[i]);
2160                 }
2161         }
2162         fprintf(f, "\"\n");
2163
2164         fprintf(f, "dmaster: %u\n", h->dmaster);
2165         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2166
2167         if (c->printlmaster && ctdb->vnn_map != NULL) {
2168                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
2169         }
2170
2171         if (c->printhash) {
2172                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2173         }
2174
2175         if (c->printrecordflags) {
2176                 fprintf(f, "flags: 0x%08x", h->flags);
2177                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2178                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2179                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2180                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2181                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2182                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2183                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2184                 fprintf(f, "\n");
2185         }
2186
2187         if (c->printdatasize) {
2188                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2189         } else {
2190                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2191                 for (i=sizeof(*h);i<data.dsize;i++) {
2192                         if (ISASCII(data.dptr[i])) {
2193                                 fprintf(f, "%c", data.dptr[i]);
2194                         } else {
2195                                 fprintf(f, "\\%02X", data.dptr[i]);
2196                         }
2197                 }
2198                 fprintf(f, "\"\n");
2199         }
2200
2201         fprintf(f, "\n");
2202
2203         return 0;
2204 }
2205
2206 /*
2207   convenience function to list all keys to stdout
2208  */
2209 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2210                  struct ctdb_dump_db_context *ctx)
2211 {
2212         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2213                                  ctx->printemptyrecords, ctx);
2214 }
2215
2216 /*
2217   get the pid of a ctdb daemon
2218  */
2219 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2220 {
2221         int ret;
2222         int32_t res;
2223
2224         ret = ctdb_control(ctdb, destnode, 0, 
2225                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2226                            NULL, NULL, &res, &timeout, NULL);
2227         if (ret != 0) {
2228                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2229                 return -1;
2230         }
2231
2232         *pid = res;
2233
2234         return 0;
2235 }
2236
2237
2238 /*
2239   async freeze send control
2240  */
2241 struct ctdb_client_control_state *
2242 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2243 {
2244         return ctdb_control_send(ctdb, destnode, priority, 
2245                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2246                            mem_ctx, &timeout, NULL);
2247 }
2248
2249 /* 
2250    async freeze recv control
2251 */
2252 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2253 {
2254         int ret;
2255         int32_t res;
2256
2257         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2258         if ( (ret != 0) || (res != 0) ){
2259                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2260                 return -1;
2261         }
2262
2263         return 0;
2264 }
2265
2266 /*
2267   freeze databases of a certain priority
2268  */
2269 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2270 {
2271         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2272         struct ctdb_client_control_state *state;
2273         int ret;
2274
2275         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2276         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2277         talloc_free(tmp_ctx);
2278
2279         return ret;
2280 }
2281
2282 /* Freeze all databases */
2283 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2284 {
2285         int i;
2286
2287         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2288                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2289                         return -1;
2290                 }
2291         }
2292         return 0;
2293 }
2294
2295 /*
2296   thaw databases of a certain priority
2297  */
2298 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2299 {
2300         int ret;
2301         int32_t res;
2302
2303         ret = ctdb_control(ctdb, destnode, priority, 
2304                            CTDB_CONTROL_THAW, 0, tdb_null, 
2305                            NULL, NULL, &res, &timeout, NULL);
2306         if (ret != 0 || res != 0) {
2307                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2308                 return -1;
2309         }
2310
2311         return 0;
2312 }
2313
2314 /* thaw all databases */
2315 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2316 {
2317         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2318 }
2319
2320 /*
2321   get pnn of a node, or -1
2322  */
2323 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2324 {
2325         int ret;
2326         int32_t res;
2327
2328         ret = ctdb_control(ctdb, destnode, 0, 
2329                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2330                            NULL, NULL, &res, &timeout, NULL);
2331         if (ret != 0) {
2332                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2333                 return -1;
2334         }
2335
2336         return res;
2337 }
2338
2339 /*
2340   get the monitoring mode of a remote node
2341  */
2342 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2343 {
2344         int ret;
2345         int32_t res;
2346
2347         ret = ctdb_control(ctdb, destnode, 0, 
2348                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2349                            NULL, NULL, &res, &timeout, NULL);
2350         if (ret != 0) {
2351                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2352                 return -1;
2353         }
2354
2355         *monmode = res;
2356
2357         return 0;
2358 }
2359
2360
2361 /*
2362  set the monitoring mode of a remote node to active
2363  */
2364 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2365 {
2366         int ret;
2367         
2368
2369         ret = ctdb_control(ctdb, destnode, 0, 
2370                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2371                            NULL, NULL,NULL, &timeout, NULL);
2372         if (ret != 0) {
2373                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2374                 return -1;
2375         }
2376
2377         
2378
2379         return 0;
2380 }
2381
2382 /*
2383   set the monitoring mode of a remote node to disable
2384  */
2385 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2386 {
2387         int ret;
2388         
2389
2390         ret = ctdb_control(ctdb, destnode, 0, 
2391                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2392                            NULL, NULL, NULL, &timeout, NULL);
2393         if (ret != 0) {
2394                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2395                 return -1;
2396         }
2397
2398         
2399
2400         return 0;
2401 }
2402
2403
2404
2405 /* 
2406   sent to a node to make it take over an ip address
2407 */
2408 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2409                           uint32_t destnode, struct ctdb_public_ip *ip)
2410 {
2411         TDB_DATA data;
2412         struct ctdb_public_ipv4 ipv4;
2413         int ret;
2414         int32_t res;
2415
2416         if (ip->addr.sa.sa_family == AF_INET) {
2417                 ipv4.pnn = ip->pnn;
2418                 ipv4.sin = ip->addr.ip;
2419
2420                 data.dsize = sizeof(ipv4);
2421                 data.dptr  = (uint8_t *)&ipv4;
2422
2423                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2424                            NULL, &res, &timeout, NULL);
2425         } else {
2426                 data.dsize = sizeof(*ip);
2427                 data.dptr  = (uint8_t *)ip;
2428
2429                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2430                            NULL, &res, &timeout, NULL);
2431         }
2432
2433         if (ret != 0 || res != 0) {
2434                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2435                 return -1;
2436         }
2437
2438         return 0;       
2439 }
2440
2441
2442 /* 
2443   sent to a node to make it release an ip address
2444 */
2445 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2446                          uint32_t destnode, struct ctdb_public_ip *ip)
2447 {
2448         TDB_DATA data;
2449         struct ctdb_public_ipv4 ipv4;
2450         int ret;
2451         int32_t res;
2452
2453         if (ip->addr.sa.sa_family == AF_INET) {
2454                 ipv4.pnn = ip->pnn;
2455                 ipv4.sin = ip->addr.ip;
2456
2457                 data.dsize = sizeof(ipv4);
2458                 data.dptr  = (uint8_t *)&ipv4;
2459
2460                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2461                                    NULL, &res, &timeout, NULL);
2462         } else {
2463                 data.dsize = sizeof(*ip);
2464                 data.dptr  = (uint8_t *)ip;
2465
2466                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2467                                    NULL, &res, &timeout, NULL);
2468         }
2469
2470         if (ret != 0 || res != 0) {
2471                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2472                 return -1;
2473         }
2474
2475         return 0;       
2476 }
2477
2478
2479 /*
2480   get a tunable
2481  */
2482 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2483                           struct timeval timeout, 
2484                           uint32_t destnode,
2485                           const char *name, uint32_t *value)
2486 {
2487         struct ctdb_control_get_tunable *t;
2488         TDB_DATA data, outdata;
2489         int32_t res;
2490         int ret;
2491
2492         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2493         data.dptr  = talloc_size(ctdb, data.dsize);
2494         CTDB_NO_MEMORY(ctdb, data.dptr);
2495
2496         t = (struct ctdb_control_get_tunable *)data.dptr;
2497         t->length = strlen(name)+1;
2498         memcpy(t->name, name, t->length);
2499
2500         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2501                            &outdata, &res, &timeout, NULL);
2502         talloc_free(data.dptr);
2503         if (ret != 0 || res != 0) {
2504                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2505                 return ret != 0 ? ret : res;
2506         }
2507
2508         if (outdata.dsize != sizeof(uint32_t)) {
2509                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2510                 talloc_free(outdata.dptr);
2511                 return -1;
2512         }
2513         
2514         *value = *(uint32_t *)outdata.dptr;
2515         talloc_free(outdata.dptr);
2516
2517         return 0;
2518 }
2519
2520 /*
2521   set a tunable
2522  */
2523 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2524                           struct timeval timeout, 
2525                           uint32_t destnode,
2526                           const char *name, uint32_t value)
2527 {
2528         struct ctdb_control_set_tunable *t;
2529         TDB_DATA data;
2530         int32_t res;
2531         int ret;
2532
2533         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2534         data.dptr  = talloc_size(ctdb, data.dsize);
2535         CTDB_NO_MEMORY(ctdb, data.dptr);
2536
2537         t = (struct ctdb_control_set_tunable *)data.dptr;
2538         t->length = strlen(name)+1;
2539         memcpy(t->name, name, t->length);
2540         t->value = value;
2541
2542         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2543                            NULL, &res, &timeout, NULL);
2544         talloc_free(data.dptr);
2545         if (ret != 0 || res != 0) {
2546                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2547                 return -1;
2548         }
2549
2550         return 0;
2551 }
2552
2553 /*
2554   list tunables
2555  */
2556 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2557                             struct timeval timeout, 
2558                             uint32_t destnode,
2559                             TALLOC_CTX *mem_ctx,
2560                             const char ***list, uint32_t *count)
2561 {
2562         TDB_DATA outdata;
2563         int32_t res;
2564         int ret;
2565         struct ctdb_control_list_tunable *t;
2566         char *p, *s, *ptr;
2567
2568         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2569                            mem_ctx, &outdata, &res, &timeout, NULL);
2570         if (ret != 0 || res != 0) {
2571                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2572                 return -1;
2573         }
2574
2575         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2576         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2577             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2578                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2579                 talloc_free(outdata.dptr);
2580                 return -1;              
2581         }
2582         
2583         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2584         CTDB_NO_MEMORY(ctdb, p);
2585
2586         talloc_free(outdata.dptr);
2587         
2588         (*list) = NULL;
2589         (*count) = 0;
2590
2591         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2592                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2593                 CTDB_NO_MEMORY(ctdb, *list);
2594                 (*list)[*count] = talloc_strdup(*list, s);
2595                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2596                 (*count)++;
2597         }
2598
2599         talloc_free(p);
2600
2601         return 0;
2602 }
2603
2604
2605 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2606                                    struct timeval timeout, uint32_t destnode,
2607                                    TALLOC_CTX *mem_ctx,
2608                                    uint32_t flags,
2609                                    struct ctdb_all_public_ips **ips)
2610 {
2611         int ret;
2612         TDB_DATA outdata;
2613         int32_t res;
2614
2615         ret = ctdb_control(ctdb, destnode, 0, 
2616                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2617                            mem_ctx, &outdata, &res, &timeout, NULL);
2618         if (ret == 0 && res == -1) {
2619                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2620                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2621         }
2622         if (ret != 0 || res != 0) {
2623           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2624                 return -1;
2625         }
2626
2627         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2628         talloc_free(outdata.dptr);
2629                     
2630         return 0;
2631 }
2632
2633 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2634                              struct timeval timeout, uint32_t destnode,
2635                              TALLOC_CTX *mem_ctx,
2636                              struct ctdb_all_public_ips **ips)
2637 {
2638         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2639                                               destnode, mem_ctx,
2640                                               0, ips);
2641 }
2642
2643 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2644                         struct timeval timeout, uint32_t destnode, 
2645                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2646 {
2647         int ret, i, len;
2648         TDB_DATA outdata;
2649         int32_t res;
2650         struct ctdb_all_public_ipsv4 *ipsv4;
2651
2652         ret = ctdb_control(ctdb, destnode, 0, 
2653                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2654                            mem_ctx, &outdata, &res, &timeout, NULL);
2655         if (ret != 0 || res != 0) {
2656                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2657                 return -1;
2658         }
2659
2660         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2661         len = offsetof(struct ctdb_all_public_ips, ips) +
2662                 ipsv4->num*sizeof(struct ctdb_public_ip);
2663         *ips = talloc_zero_size(mem_ctx, len);
2664         CTDB_NO_MEMORY(ctdb, *ips);
2665         (*ips)->num = ipsv4->num;
2666         for (i=0; i<ipsv4->num; i++) {
2667                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2668                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2669         }
2670
2671         talloc_free(outdata.dptr);
2672                     
2673         return 0;
2674 }
2675
2676 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2677                                  struct timeval timeout, uint32_t destnode,
2678                                  TALLOC_CTX *mem_ctx,
2679                                  const ctdb_sock_addr *addr,
2680                                  struct ctdb_control_public_ip_info **_info)
2681 {
2682         int ret;
2683         TDB_DATA indata;
2684         TDB_DATA outdata;
2685         int32_t res;
2686         struct ctdb_control_public_ip_info *info;
2687         uint32_t len;
2688         uint32_t i;
2689
2690         indata.dptr = discard_const_p(uint8_t, addr);
2691         indata.dsize = sizeof(*addr);
2692
2693         ret = ctdb_control(ctdb, destnode, 0,
2694                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2695                            mem_ctx, &outdata, &res, &timeout, NULL);
2696         if (ret != 0 || res != 0) {
2697                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2698                                 "failed ret:%d res:%d\n",
2699                                 ret, res));
2700                 return -1;
2701         }
2702
2703         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2704         if (len > outdata.dsize) {
2705                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2706                                 "returned invalid data with size %u > %u\n",
2707                                 (unsigned int)outdata.dsize,
2708                                 (unsigned int)len));
2709                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2710                 return -1;
2711         }
2712
2713         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2714         len += info->num*sizeof(struct ctdb_control_iface_info);
2715
2716         if (len > outdata.dsize) {
2717                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2718                                 "returned invalid data with size %u > %u\n",
2719                                 (unsigned int)outdata.dsize,
2720                                 (unsigned int)len));
2721                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2722                 return -1;
2723         }
2724
2725         /* make sure we null terminate the returned strings */
2726         for (i=0; i < info->num; i++) {
2727                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2728         }
2729
2730         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2731                                                                 outdata.dptr,
2732                                                                 outdata.dsize);
2733         talloc_free(outdata.dptr);
2734         if (*_info == NULL) {
2735                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2736                                 "talloc_memdup size %u failed\n",
2737                                 (unsigned int)outdata.dsize));
2738                 return -1;
2739         }
2740
2741         return 0;
2742 }
2743
2744 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2745                          struct timeval timeout, uint32_t destnode,
2746                          TALLOC_CTX *mem_ctx,
2747                          struct ctdb_control_get_ifaces **_ifaces)
2748 {
2749         int ret;
2750         TDB_DATA outdata;
2751         int32_t res;
2752         struct ctdb_control_get_ifaces *ifaces;
2753         uint32_t len;
2754         uint32_t i;
2755
2756         ret = ctdb_control(ctdb, destnode, 0,
2757                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2758                            mem_ctx, &outdata, &res, &timeout, NULL);
2759         if (ret != 0 || res != 0) {
2760                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2761                                 "failed ret:%d res:%d\n",
2762                                 ret, res));
2763                 return -1;
2764         }
2765
2766         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2767         if (len > outdata.dsize) {
2768                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2769                                 "returned invalid data with size %u > %u\n",
2770                                 (unsigned int)outdata.dsize,
2771                                 (unsigned int)len));
2772                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2773                 return -1;
2774         }
2775
2776         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2777         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2778
2779         if (len > outdata.dsize) {
2780                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2781                                 "returned invalid data with size %u > %u\n",
2782                                 (unsigned int)outdata.dsize,
2783                                 (unsigned int)len));
2784                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2785                 return -1;
2786         }
2787
2788         /* make sure we null terminate the returned strings */
2789         for (i=0; i < ifaces->num; i++) {
2790                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2791         }
2792
2793         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2794                                                                   outdata.dptr,
2795                                                                   outdata.dsize);
2796         talloc_free(outdata.dptr);
2797         if (*_ifaces == NULL) {
2798                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2799                                 "talloc_memdup size %u failed\n",
2800                                 (unsigned int)outdata.dsize));
2801                 return -1;
2802         }
2803
2804         return 0;
2805 }
2806
2807 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2808                              struct timeval timeout, uint32_t destnode,
2809                              TALLOC_CTX *mem_ctx,
2810                              const struct ctdb_control_iface_info *info)
2811 {
2812         int ret;
2813         TDB_DATA indata;
2814         int32_t res;
2815
2816         indata.dptr = discard_const_p(uint8_t, info);
2817         indata.dsize = sizeof(*info);
2818
2819         ret = ctdb_control(ctdb, destnode, 0,
2820                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2821                            mem_ctx, NULL, &res, &timeout, NULL);
2822         if (ret != 0 || res != 0) {
2823                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2824                                 "failed ret:%d res:%d\n",
2825                                 ret, res));
2826                 return -1;
2827         }
2828
2829         return 0;
2830 }
2831
2832 /*
2833   set/clear the permanent disabled bit on a remote node
2834  */
2835 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2836                        uint32_t set, uint32_t clear)
2837 {
2838         int ret;
2839         TDB_DATA data;
2840         struct ctdb_node_map *nodemap=NULL;
2841         struct ctdb_node_flag_change c;
2842         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2843         uint32_t recmaster;
2844         uint32_t *nodes;
2845
2846
2847         /* find the recovery master */
2848         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2849         if (ret != 0) {
2850                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2851                 talloc_free(tmp_ctx);
2852                 return ret;
2853         }
2854
2855
2856         /* read the node flags from the recmaster */
2857         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2858         if (ret != 0) {
2859                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2860                 talloc_free(tmp_ctx);
2861                 return -1;
2862         }
2863         if (destnode >= nodemap->num) {
2864                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2865                 talloc_free(tmp_ctx);
2866                 return -1;
2867         }
2868
2869         c.pnn       = destnode;
2870         c.old_flags = nodemap->nodes[destnode].flags;
2871         c.new_flags = c.old_flags;
2872         c.new_flags |= set;
2873         c.new_flags &= ~clear;
2874
2875         data.dsize = sizeof(c);
2876         data.dptr = (unsigned char *)&c;
2877
2878         /* send the flags update to all connected nodes */
2879         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2880
2881         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2882                                         nodes, 0,
2883                                         timeout, false, data,
2884                                         NULL, NULL,
2885                                         NULL) != 0) {
2886                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2887
2888                 talloc_free(tmp_ctx);
2889                 return -1;
2890         }
2891
2892         talloc_free(tmp_ctx);
2893         return 0;
2894 }
2895
2896
2897 /*
2898   get all tunables
2899  */
2900 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2901                                struct timeval timeout, 
2902                                uint32_t destnode,
2903                                struct ctdb_tunable *tunables)
2904 {
2905         TDB_DATA outdata;
2906         int ret;
2907         int32_t res;
2908
2909         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2910                            &outdata, &res, &timeout, NULL);
2911         if (ret != 0 || res != 0) {
2912                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2913                 return -1;
2914         }
2915
2916         if (outdata.dsize != sizeof(*tunables)) {
2917                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2918                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2919                 return -1;              
2920         }
2921
2922         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2923         talloc_free(outdata.dptr);
2924         return 0;
2925 }
2926
2927 /*
2928   add a public address to a node
2929  */
2930 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2931                       struct timeval timeout, 
2932                       uint32_t destnode,
2933                       struct ctdb_control_ip_iface *pub)
2934 {
2935         TDB_DATA data;
2936         int32_t res;
2937         int ret;
2938
2939         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2940         data.dptr  = (unsigned char *)pub;
2941
2942         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2943                            NULL, &res, &timeout, NULL);
2944         if (ret != 0 || res != 0) {
2945                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2946                 return -1;
2947         }
2948
2949         return 0;
2950 }
2951
2952 /*
2953   delete a public address from a node
2954  */
2955 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2956                       struct timeval timeout, 
2957                       uint32_t destnode,
2958                       struct ctdb_control_ip_iface *pub)
2959 {
2960         TDB_DATA data;
2961         int32_t res;
2962         int ret;
2963
2964         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2965         data.dptr  = (unsigned char *)pub;
2966
2967         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2968                            NULL, &res, &timeout, NULL);
2969         if (ret != 0 || res != 0) {
2970                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2971                 return -1;
2972         }
2973
2974         return 0;
2975 }
2976
2977 /*
2978   kill a tcp connection
2979  */
2980 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2981                       struct timeval timeout, 
2982                       uint32_t destnode,
2983                       struct ctdb_control_killtcp *killtcp)
2984 {
2985         TDB_DATA data;
2986         int32_t res;
2987         int ret;
2988
2989         data.dsize = sizeof(struct ctdb_control_killtcp);
2990         data.dptr  = (unsigned char *)killtcp;
2991
2992         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2993                            NULL, &res, &timeout, NULL);
2994         if (ret != 0 || res != 0) {
2995                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2996                 return -1;
2997         }
2998
2999         return 0;
3000 }
3001
3002 /*
3003   send a gratious arp
3004  */
3005 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3006                       struct timeval timeout, 
3007                       uint32_t destnode,
3008                       ctdb_sock_addr *addr,
3009                       const char *ifname)
3010 {
3011         TDB_DATA data;
3012         int32_t res;
3013         int ret, len;
3014         struct ctdb_control_gratious_arp *gratious_arp;
3015         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3016
3017
3018         len = strlen(ifname)+1;
3019         gratious_arp = talloc_size(tmp_ctx, 
3020                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3021         CTDB_NO_MEMORY(ctdb, gratious_arp);
3022
3023         gratious_arp->addr = *addr;
3024         gratious_arp->len = len;
3025         memcpy(&gratious_arp->iface[0], ifname, len);
3026
3027
3028         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3029         data.dptr  = (unsigned char *)gratious_arp;
3030
3031         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3032                            NULL, &res, &timeout, NULL);
3033         if (ret != 0 || res != 0) {
3034                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3035                 talloc_free(tmp_ctx);
3036                 return -1;
3037         }
3038
3039         talloc_free(tmp_ctx);
3040         return 0;
3041 }
3042
3043 /*
3044   get a list of all tcp tickles that a node knows about for a particular vnn
3045  */
3046 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3047                               struct timeval timeout, uint32_t destnode, 
3048                               TALLOC_CTX *mem_ctx, 
3049                               ctdb_sock_addr *addr,
3050                               struct ctdb_control_tcp_tickle_list **list)
3051 {
3052         int ret;
3053         TDB_DATA data, outdata;
3054         int32_t status;
3055
3056         data.dptr = (uint8_t*)addr;
3057         data.dsize = sizeof(ctdb_sock_addr);
3058
3059         ret = ctdb_control(ctdb, destnode, 0, 
3060                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3061                            mem_ctx, &outdata, &status, NULL, NULL);
3062         if (ret != 0 || status != 0) {
3063                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3064                 return -1;
3065         }
3066
3067         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3068
3069         return status;
3070 }
3071
3072 /*
3073   register a server id
3074  */
3075 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3076                       struct timeval timeout, 
3077                       struct ctdb_server_id *id)
3078 {
3079         TDB_DATA data;
3080         int32_t res;
3081         int ret;
3082
3083         data.dsize = sizeof(struct ctdb_server_id);
3084         data.dptr  = (unsigned char *)id;
3085
3086         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3087                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3088                         0, data, NULL,
3089                         NULL, &res, &timeout, NULL);
3090         if (ret != 0 || res != 0) {
3091                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3092                 return -1;
3093         }
3094
3095         return 0;
3096 }
3097
3098 /*
3099   unregister a server id
3100  */
3101 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3102                       struct timeval timeout, 
3103                       struct ctdb_server_id *id)
3104 {
3105         TDB_DATA data;
3106         int32_t res;
3107         int ret;
3108
3109         data.dsize = sizeof(struct ctdb_server_id);
3110         data.dptr  = (unsigned char *)id;
3111
3112         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3113                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3114                         0, data, NULL,
3115                         NULL, &res, &timeout, NULL);
3116         if (ret != 0 || res != 0) {
3117                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3118                 return -1;
3119         }
3120
3121         return 0;
3122 }
3123
3124
3125 /*
3126   check if a server id exists
3127
3128   if a server id does exist, return *status == 1, otherwise *status == 0
3129  */
3130 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3131                       struct timeval timeout, 
3132                       uint32_t destnode,
3133                       struct ctdb_server_id *id,
3134                       uint32_t *status)
3135 {
3136         TDB_DATA data;
3137         int32_t res;
3138         int ret;
3139
3140         data.dsize = sizeof(struct ctdb_server_id);
3141         data.dptr  = (unsigned char *)id;
3142
3143         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3144                         0, data, NULL,
3145                         NULL, &res, &timeout, NULL);
3146         if (ret != 0) {
3147                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3148                 return -1;
3149         }
3150
3151         if (res) {
3152                 *status = 1;
3153         } else {
3154                 *status = 0;
3155         }
3156
3157         return 0;
3158 }
3159
3160 /*
3161    get the list of server ids that are registered on a node
3162 */
3163 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3164                 TALLOC_CTX *mem_ctx,
3165                 struct timeval timeout, uint32_t destnode, 
3166                 struct ctdb_server_id_list **svid_list)
3167 {
3168         int ret;
3169         TDB_DATA outdata;
3170         int32_t res;
3171
3172         ret = ctdb_control(ctdb, destnode, 0, 
3173                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3174                            mem_ctx, &outdata, &res, &timeout, NULL);
3175         if (ret != 0 || res != 0) {
3176                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3177                 return -1;
3178         }
3179
3180         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3181                     
3182         return 0;
3183 }
3184
3185 /*
3186   initialise the ctdb daemon for client applications
3187
3188   NOTE: In current code the daemon does not fork. This is for testing purposes only
3189   and to simplify the code.
3190 */
3191 struct ctdb_context *ctdb_init(struct event_context *ev)
3192 {
3193         int ret;
3194         struct ctdb_context *ctdb;
3195
3196         ctdb = talloc_zero(ev, struct ctdb_context);
3197         if (ctdb == NULL) {
3198                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3199                 return NULL;
3200         }
3201         ctdb->ev  = ev;
3202         ctdb->idr = idr_init(ctdb);
3203         /* Wrap early to exercise code. */
3204         ctdb->lastid = INT_MAX-200;
3205         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3206
3207         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3208         if (ret != 0) {
3209                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3210                 talloc_free(ctdb);
3211                 return NULL;
3212         }
3213
3214         ctdb->statistics.statistics_start_time = timeval_current();
3215
3216         return ctdb;
3217 }
3218
3219
3220 /*
3221   set some ctdb flags
3222 */
3223 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3224 {
3225         ctdb->flags |= flags;
3226 }
3227
3228 /*
3229   setup the local socket name
3230 */
3231 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3232 {
3233         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3234         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3235
3236         return 0;
3237 }
3238
3239 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3240 {
3241         return ctdb->daemon.name;
3242 }
3243
3244 /*
3245   return the pnn of this node
3246 */
3247 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3248 {
3249         return ctdb->pnn;
3250 }
3251
3252
3253 /*
3254   get the uptime of a remote node
3255  */
3256 struct ctdb_client_control_state *
3257 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3258 {
3259         return ctdb_control_send(ctdb, destnode, 0, 
3260                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3261                            mem_ctx, &timeout, NULL);
3262 }
3263
3264 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3265 {
3266         int ret;
3267         int32_t res;
3268         TDB_DATA outdata;
3269
3270         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3271         if (ret != 0 || res != 0) {
3272                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3273                 return -1;
3274         }
3275
3276         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3277
3278         return 0;
3279 }
3280
3281 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3282 {
3283         struct ctdb_client_control_state *state;
3284
3285         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3286         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3287 }
3288
3289 /*
3290   send a control to execute the "recovered" event script on a node
3291  */
3292 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3293 {
3294         int ret;
3295         int32_t status;
3296
3297         ret = ctdb_control(ctdb, destnode, 0, 
3298                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3299                            NULL, NULL, &status, &timeout, NULL);
3300         if (ret != 0 || status != 0) {
3301                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3302                 return -1;
3303         }
3304
3305         return 0;
3306 }
3307
3308 /* 
3309   callback for the async helpers used when sending the same control
3310   to multiple nodes in parallell.
3311 */
3312 static void async_callback(struct ctdb_client_control_state *state)
3313 {
3314         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3315         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3316         int ret;
3317         TDB_DATA outdata;
3318         int32_t res = -1;
3319         uint32_t destnode = state->c->hdr.destnode;
3320
3321         /* one more node has responded with recmode data */
3322         data->count--;
3323
3324         /* if we failed to push the db, then return an error and let
3325            the main loop try again.
3326         */
3327         if (state->state != CTDB_CONTROL_DONE) {
3328                 if ( !data->dont_log_errors) {
3329                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3330                 }
3331                 data->fail_count++;
3332                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3333                         res = -ETIME;
3334                 } else {
3335                         res = -1;
3336                 }
3337                 if (data->fail_callback) {
3338                         data->fail_callback(ctdb, destnode, res, outdata,
3339                                         data->callback_data);
3340                 }
3341                 return;
3342         }
3343         
3344         state->async.fn = NULL;
3345
3346         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3347         if ((ret != 0) || (res != 0)) {
3348                 if ( !data->dont_log_errors) {
3349                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3350                 }
3351                 data->fail_count++;
3352                 if (data->fail_callback) {
3353                         data->fail_callback(ctdb, destnode, res, outdata,
3354                                         data->callback_data);
3355                 }
3356         }
3357         if ((ret == 0) && (data->callback != NULL)) {
3358                 data->callback(ctdb, destnode, res, outdata,
3359                                         data->callback_data);
3360         }
3361 }
3362
3363
3364 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3365 {
3366         /* set up the callback functions */
3367         state->async.fn = async_callback;
3368         state->async.private_data = data;
3369         
3370         /* one more control to wait for to complete */
3371         data->count++;
3372 }
3373
3374
3375 /* wait for up to the maximum number of seconds allowed
3376    or until all nodes we expect a response from has replied
3377 */
3378 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3379 {
3380         while (data->count > 0) {
3381                 event_loop_once(ctdb->ev);
3382         }
3383         if (data->fail_count != 0) {
3384                 if (!data->dont_log_errors) {
3385                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3386                                  data->fail_count));
3387                 }
3388                 return -1;
3389         }
3390         return 0;
3391 }
3392
3393
3394 /* 
3395    perform a simple control on the listed nodes
3396    The control cannot return data
3397  */
3398 int ctdb_client_async_control(struct ctdb_context *ctdb,
3399                                 enum ctdb_controls opcode,
3400                                 uint32_t *nodes,
3401                                 uint64_t srvid,
3402                                 struct timeval timeout,
3403                                 bool dont_log_errors,
3404                                 TDB_DATA data,
3405                                 client_async_callback client_callback,
3406                                 client_async_callback fail_callback,
3407                                 void *callback_data)
3408 {
3409         struct client_async_data *async_data;
3410         struct ctdb_client_control_state *state;
3411         int j, num_nodes;
3412
3413         async_data = talloc_zero(ctdb, struct client_async_data);
3414         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3415         async_data->dont_log_errors = dont_log_errors;
3416         async_data->callback = client_callback;
3417         async_data->fail_callback = fail_callback;
3418         async_data->callback_data = callback_data;
3419         async_data->opcode        = opcode;
3420
3421         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3422
3423         /* loop over all nodes and send an async control to each of them */
3424         for (j=0; j<num_nodes; j++) {
3425                 uint32_t pnn = nodes[j];
3426
3427                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3428                                           0, data, async_data, &timeout, NULL);
3429                 if (state == NULL) {
3430                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3431                         talloc_free(async_data);
3432                         return -1;
3433                 }
3434                 
3435                 ctdb_client_async_add(async_data, state);
3436         }
3437
3438         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3439                 talloc_free(async_data);
3440                 return -1;
3441         }
3442
3443         talloc_free(async_data);
3444         return 0;
3445 }
3446
3447 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3448                                 struct ctdb_vnn_map *vnn_map,
3449                                 TALLOC_CTX *mem_ctx,
3450                                 bool include_self)
3451 {
3452         int i, j, num_nodes;
3453         uint32_t *nodes;
3454
3455         for (i=num_nodes=0;i<vnn_map->size;i++) {
3456                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3457                         continue;
3458                 }
3459                 num_nodes++;
3460         } 
3461
3462         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3463         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3464
3465         for (i=j=0;i<vnn_map->size;i++) {
3466                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3467                         continue;
3468                 }
3469                 nodes[j++] = vnn_map->map[i];
3470         } 
3471
3472         return nodes;
3473 }
3474
3475 /* Get list of nodes not including those with flags specified by mask.
3476  * If exclude_pnn is not -1 then exclude that pnn from the list.
3477  */
3478 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3479                         struct ctdb_node_map *node_map,
3480                         TALLOC_CTX *mem_ctx,
3481                         uint32_t mask,
3482                         int exclude_pnn)
3483 {
3484         int i, j, num_nodes;
3485         uint32_t *nodes;
3486
3487         for (i=num_nodes=0;i<node_map->num;i++) {
3488                 if (node_map->nodes[i].flags & mask) {
3489                         continue;
3490                 }
3491                 if (node_map->nodes[i].pnn == exclude_pnn) {
3492                         continue;
3493                 }
3494                 num_nodes++;
3495         } 
3496
3497         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3498         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3499
3500         for (i=j=0;i<node_map->num;i++) {
3501                 if (node_map->nodes[i].flags & mask) {
3502                         continue;
3503                 }
3504                 if (node_map->nodes[i].pnn == exclude_pnn) {
3505                         continue;
3506                 }
3507                 nodes[j++] = node_map->nodes[i].pnn;
3508         } 
3509
3510         return nodes;
3511 }
3512
3513 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3514                                 struct ctdb_node_map *node_map,
3515                                 TALLOC_CTX *mem_ctx,
3516                                 bool include_self)
3517 {
3518         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3519                              include_self ? -1 : ctdb->pnn);
3520 }
3521
3522 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3523                                 struct ctdb_node_map *node_map,
3524                                 TALLOC_CTX *mem_ctx,
3525                                 uint32_t pnn)
3526 {
3527         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE, pnn);
3528 }
3529
3530 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3531                                 struct ctdb_node_map *node_map,
3532                                 TALLOC_CTX *mem_ctx,
3533                                 bool include_self)
3534 {
3535         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3536                              include_self ? -1 : ctdb->pnn);
3537 }
3538
3539 /* 
3540   this is used to test if a pnn lock exists and if it exists will return
3541   the number of connections that pnn has reported or -1 if that recovery
3542   daemon is not running.
3543 */
3544 int
3545 ctdb_read_pnn_lock(int fd, int32_t pnn)
3546 {
3547         struct flock lock;
3548         char c;
3549
3550         lock.l_type = F_WRLCK;
3551         lock.l_whence = SEEK_SET;
3552         lock.l_start = pnn;
3553         lock.l_len = 1;
3554         lock.l_pid = 0;
3555
3556         if (fcntl(fd, F_GETLK, &lock) != 0) {
3557                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3558                 return -1;
3559         }
3560
3561         if (lock.l_type == F_UNLCK) {
3562                 return -1;
3563         }
3564
3565         if (pread(fd, &c, 1, pnn) == -1) {
3566                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3567                 return -1;
3568         }
3569
3570         return c;
3571 }
3572
3573 /*
3574   get capabilities of a remote node
3575  */
3576 struct ctdb_client_control_state *
3577 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3578 {
3579         return ctdb_control_send(ctdb, destnode, 0, 
3580                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3581                            mem_ctx, &timeout, NULL);
3582 }
3583
3584 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3585 {
3586         int ret;
3587         int32_t res;
3588         TDB_DATA outdata;
3589
3590         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3591         if ( (ret != 0) || (res != 0) ) {
3592                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3593                 return -1;
3594         }
3595
3596         if (capabilities) {
3597                 *capabilities = *((uint32_t *)outdata.dptr);
3598         }
3599
3600         return 0;
3601 }
3602
3603 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3604 {
3605         struct ctdb_client_control_state *state;
3606         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3607         int ret;
3608
3609         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3610         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3611         talloc_free(tmp_ctx);
3612         return ret;
3613 }
3614
3615 /**
3616  * check whether a transaction is active on a given db on a given node
3617  */
3618 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3619                                      uint32_t destnode,
3620                                      uint32_t db_id)
3621 {
3622         int32_t status;
3623         int ret;
3624         TDB_DATA indata;
3625
3626         indata.dptr = (uint8_t *)&db_id;
3627         indata.dsize = sizeof(db_id);
3628
3629         ret = ctdb_control(ctdb, destnode, 0,
3630                            CTDB_CONTROL_TRANS2_ACTIVE,
3631                            0, indata, NULL, NULL, &status,
3632                            NULL, NULL);
3633
3634         if (ret != 0) {
3635                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3636                 return -1;
3637         }
3638
3639         return status;
3640 }
3641
3642
3643 struct ctdb_transaction_handle {
3644         struct ctdb_db_context *ctdb_db;
3645         bool in_replay;
3646         /*
3647          * we store the reads and writes done under a transaction:
3648          * - one list stores both reads and writes (m_all),
3649          * - the other just writes (m_write)
3650          */
3651         struct ctdb_marshall_buffer *m_all;
3652         struct ctdb_marshall_buffer *m_write;
3653 };
3654
3655 /* start a transaction on a database */
3656 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3657 {
3658         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3659         return 0;
3660 }
3661
3662 /* start a transaction on a database */
3663 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3664 {
3665         struct ctdb_record_handle *rh;
3666         TDB_DATA key;
3667         TDB_DATA data;
3668         struct ctdb_ltdb_header header;
3669         TALLOC_CTX *tmp_ctx;
3670         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3671         int ret;
3672         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3673         pid_t pid;
3674         int32_t status;
3675
3676         key.dptr = discard_const(keyname);
3677         key.dsize = strlen(keyname);
3678
3679         if (!ctdb_db->persistent) {
3680                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3681                 return -1;
3682         }
3683
3684 again:
3685         tmp_ctx = talloc_new(h);
3686
3687         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3688         if (rh == NULL) {
3689                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3690                 talloc_free(tmp_ctx);
3691                 return -1;
3692         }
3693
3694         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3695                                               CTDB_CURRENT_NODE,
3696                                               ctdb_db->db_id);
3697         if (status == 1) {
3698                 unsigned long int usec = (1000 + random()) % 100000;
3699                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3700                                     "on db_id[0x%08x]. waiting for %lu "
3701                                     "microseconds\n",
3702                                     ctdb_db->db_id, usec));
3703                 talloc_free(tmp_ctx);
3704                 usleep(usec);
3705                 goto again;
3706         }
3707
3708         /*
3709          * store the pid in the database:
3710          * it is not enough that the node is dmaster...
3711          */
3712         pid = getpid();
3713         data.dptr = (unsigned char *)&pid;
3714         data.dsize = sizeof(pid_t);
3715         rh->header.rsn++;
3716         rh->header.dmaster = ctdb_db->ctdb->pnn;
3717         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3718         if (ret != 0) {
3719                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3720                                   "transaction record\n"));
3721                 talloc_free(tmp_ctx);
3722                 return -1;
3723         }
3724
3725         talloc_free(rh);
3726
3727         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3728         if (ret != 0) {
3729                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3730                 talloc_free(tmp_ctx);
3731                 return -1;
3732         }
3733
3734         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3735         if (ret != 0) {
3736                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3737                                  "lock record inside transaction\n"));
3738                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3739                 talloc_free(tmp_ctx);
3740                 goto again;
3741         }
3742
3743         if (header.dmaster != ctdb_db->ctdb->pnn) {
3744                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3745                                    "transaction lock record\n"));
3746                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3747                 talloc_free(tmp_ctx);
3748                 goto again;
3749         }
3750
3751         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3752                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3753                                     "the transaction lock record\n"));
3754                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3755                 talloc_free(tmp_ctx);
3756                 goto again;
3757         }
3758
3759         talloc_free(tmp_ctx);
3760
3761         return 0;
3762 }
3763
3764
3765 /* start a transaction on a database */
3766 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3767                                                        TALLOC_CTX *mem_ctx)
3768 {
3769         struct ctdb_transaction_handle *h;
3770         int ret;
3771
3772         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3773         if (h == NULL) {
3774                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3775                 return NULL;
3776         }
3777
3778         h->ctdb_db = ctdb_db;
3779
3780         ret = ctdb_transaction_fetch_start(h);
3781         if (ret != 0) {
3782                 talloc_free(h);
3783                 return NULL;
3784         }
3785
3786         talloc_set_destructor(h, ctdb_transaction_destructor);
3787
3788         return h;
3789 }
3790
3791
3792
3793 /*
3794   fetch a record inside a transaction
3795  */
3796 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3797                            TALLOC_CTX *mem_ctx, 
3798                            TDB_DATA key, TDB_DATA *data)
3799 {
3800         struct ctdb_ltdb_header header;
3801         int ret;
3802
3803         ZERO_STRUCT(header);
3804
3805         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3806         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3807                 /* record doesn't exist yet */
3808                 *data = tdb_null;
3809                 ret = 0;
3810         }
3811         
3812         if (ret != 0) {
3813                 return ret;
3814         }
3815
3816         if (!h->in_replay) {
3817                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3818                 if (h->m_all == NULL) {
3819                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3820                         return -1;
3821                 }
3822         }
3823
3824         return 0;
3825 }
3826
3827 /*
3828   stores a record inside a transaction
3829  */
3830 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3831                            TDB_DATA key, TDB_DATA data)
3832 {
3833         TALLOC_CTX *tmp_ctx = talloc_new(h);
3834         struct ctdb_ltdb_header header;
3835         TDB_DATA olddata;
3836         int ret;
3837
3838         ZERO_STRUCT(header);
3839
3840         /* we need the header so we can update the RSN */
3841         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3842         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3843                 /* the record doesn't exist - create one with us as dmaster.
3844                    This is only safe because we are in a transaction and this
3845                    is a persistent database */
3846                 ZERO_STRUCT(header);
3847         } else if (ret != 0) {
3848                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3849                 talloc_free(tmp_ctx);
3850                 return ret;
3851         }
3852
3853         if (data.dsize == olddata.dsize &&
3854             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3855                 /* save writing the same data */
3856                 talloc_free(tmp_ctx);
3857                 return 0;
3858         }
3859
3860         header.dmaster = h->ctdb_db->ctdb->pnn;
3861         header.rsn++;
3862
3863         if (!h->in_replay) {
3864                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3865                 if (h->m_all == NULL) {
3866                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3867                         talloc_free(tmp_ctx);
3868                         return -1;
3869                 }
3870         }               
3871
3872         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3873         if (h->m_write == NULL) {
3874                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3875                 talloc_free(tmp_ctx);
3876                 return -1;
3877         }
3878         
3879         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3880
3881         talloc_free(tmp_ctx);
3882         
3883         return ret;
3884 }
3885
3886 /*
3887   replay a transaction
3888  */
3889 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3890 {
3891         int ret, i;
3892         struct ctdb_rec_data *rec = NULL;
3893
3894         h->in_replay = true;
3895         talloc_free(h->m_write);
3896         h->m_write = NULL;
3897
3898         ret = ctdb_transaction_fetch_start(h);
3899         if (ret != 0) {
3900                 return ret;
3901         }
3902
3903         for (i=0;i<h->m_all->count;i++) {
3904                 TDB_DATA key, data;
3905
3906                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3907                 if (rec == NULL) {
3908                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3909                         goto failed;
3910                 }
3911
3912                 if (rec->reqid == 0) {
3913                         /* its a store */
3914                         if (ctdb_transaction_store(h, key, data) != 0) {
3915                                 goto failed;
3916                         }
3917                 } else {
3918                         TDB_DATA data2;
3919                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3920
3921                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3922                                 talloc_free(tmp_ctx);
3923                                 goto failed;
3924                         }
3925                         if (data2.dsize != data.dsize ||
3926                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3927                                 /* the record has changed on us - we have to give up */
3928                                 talloc_free(tmp_ctx);
3929                                 goto failed;
3930                         }
3931                         talloc_free(tmp_ctx);
3932                 }
3933         }
3934         
3935         return 0;
3936
3937 failed:
3938         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3939         return -1;
3940 }
3941
3942
3943 /*
3944   commit a transaction
3945  */
3946 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3947 {
3948         int ret, retries=0;
3949         int32_t status;
3950         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3951         struct timeval timeout;
3952         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3953
3954         talloc_set_destructor(h, NULL);
3955
3956         /* our commit strategy is quite complex.
3957
3958            - we first try to commit the changes to all other nodes
3959
3960            - if that works, then we commit locally and we are done
3961
3962            - if a commit on another node fails, then we need to cancel
3963              the transaction, then restart the transaction (thus
3964              opening a window of time for a pending recovery to
3965              complete), then replay the transaction, checking all the
3966              reads and writes (checking that reads give the same data,
3967              and writes succeed). Then we retry the transaction to the
3968              other nodes
3969         */
3970
3971 again:
3972         if (h->m_write == NULL) {
3973                 /* no changes were made */
3974                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3975                 talloc_free(h);
3976                 return 0;
3977         }
3978
3979         /* tell ctdbd to commit to the other nodes */
3980         timeout = timeval_current_ofs(1, 0);
3981         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3982                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3983                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3984                            &timeout, NULL);
3985         if (ret != 0 || status != 0) {
3986                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3987                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3988                                      ", retrying after 1 second...\n",
3989                                      (retries==0)?"":"retry "));
3990                 sleep(1);
3991
3992                 if (ret != 0) {
3993                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3994                 } else {
3995                         /* work out what error code we will give if we 
3996                            have to fail the operation */
3997                         switch ((enum ctdb_trans2_commit_error)status) {
3998                         case CTDB_TRANS2_COMMIT_SUCCESS:
3999                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
4000                         case CTDB_TRANS2_COMMIT_TIMEOUT:
4001                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4002                                 break;
4003                         case CTDB_TRANS2_COMMIT_ALLFAIL:
4004                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
4005                                 break;
4006                         }
4007                 }
4008
4009                 if (++retries == 100) {
4010                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
4011                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
4012                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4013                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4014                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4015                         talloc_free(h);
4016                         return -1;
4017                 }               
4018
4019                 if (ctdb_replay_transaction(h) != 0) {
4020                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
4021                                           "transaction on db 0x%08x, "
4022                                           "failure control =%u\n",
4023                                           h->ctdb_db->db_id,
4024                                           (unsigned)failure_control));
4025                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4026                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4027                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4028                         talloc_free(h);
4029                         return -1;
4030                 }
4031                 goto again;
4032         } else {
4033                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4034         }
4035
4036         /* do the real commit locally */
4037         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
4038         if (ret != 0) {
4039                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
4040                                   "on db id 0x%08x locally, "
4041                                   "failure_control=%u\n",
4042                                   h->ctdb_db->db_id,
4043                                   (unsigned)failure_control));
4044                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4045                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4046                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
4047                 talloc_free(h);
4048                 return ret;
4049         }
4050
4051         /* tell ctdbd that we are finished with our local commit */
4052         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4053                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
4054                      tdb_null, NULL, NULL, NULL, NULL, NULL);
4055         talloc_free(h);
4056         return 0;
4057 }
4058
4059 /*
4060   recovery daemon ping to main daemon
4061  */
4062 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4063 {
4064         int ret;
4065         int32_t res;
4066
4067         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4068                            ctdb, NULL, &res, NULL, NULL);
4069         if (ret != 0 || res != 0) {
4070                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4071                 return -1;
4072         }
4073
4074         return 0;
4075 }
4076
4077 /* When forking the main daemon and the child process needs to connect
4078  * back to the daemon as a client process, this function can be used
4079  * to change the ctdb context from daemon into client mode.  The child
4080  * process must be created using ctdb_fork() and not fork() -
4081  * ctdb_fork() does some necessary housekeeping.
4082  */
4083 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4084 {
4085         int ret;
4086         va_list ap;
4087
4088         /* Add extra information so we can identify this in the logs */
4089         va_start(ap, fmt);
4090         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4091         va_end(ap);
4092
4093         /* get a new event context */
4094         ctdb->ev = event_context_init(ctdb);
4095         tevent_loop_allow_nesting(ctdb->ev);
4096
4097         /* Connect to main CTDB daemon */
4098         ret = ctdb_socket_connect(ctdb);
4099         if (ret != 0) {
4100                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4101                 return -1;
4102         }
4103
4104         ctdb->can_send_controls = true;
4105
4106         return 0;
4107 }
4108
4109 /*
4110   get the status of running the monitor eventscripts: NULL means never run.
4111  */
4112 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4113                 struct timeval timeout, uint32_t destnode, 
4114                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4115                 struct ctdb_scripts_wire **scripts)
4116 {
4117         int ret;
4118         TDB_DATA outdata, indata;
4119         int32_t res;
4120         uint32_t uinttype = type;
4121
4122         indata.dptr = (uint8_t *)&uinttype;
4123         indata.dsize = sizeof(uinttype);
4124
4125         ret = ctdb_control(ctdb, destnode, 0, 
4126                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4127                            mem_ctx, &outdata, &res, &timeout, NULL);
4128         if (ret != 0 || res != 0) {
4129                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4130                 return -1;
4131         }
4132
4133         if (outdata.dsize == 0) {
4134                 *scripts = NULL;
4135         } else {
4136                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4137                 talloc_free(outdata.dptr);
4138         }
4139                     
4140         return 0;
4141 }
4142
4143 /*
4144   tell the main daemon how long it took to lock the reclock file
4145  */
4146 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4147 {
4148         int ret;
4149         int32_t res;
4150         TDB_DATA data;
4151
4152         data.dptr = (uint8_t *)&latency;
4153         data.dsize = sizeof(latency);
4154
4155         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4156                            ctdb, NULL, &res, NULL, NULL);
4157         if (ret != 0 || res != 0) {
4158                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4159                 return -1;
4160         }
4161
4162         return 0;
4163 }
4164
4165 /*
4166   get the name of the reclock file
4167  */
4168 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4169                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4170                          const char **name)
4171 {
4172         int ret;
4173         int32_t res;
4174         TDB_DATA data;
4175
4176         ret = ctdb_control(ctdb, destnode, 0, 
4177                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4178                            mem_ctx, &data, &res, &timeout, NULL);
4179         if (ret != 0 || res != 0) {
4180                 return -1;
4181         }
4182
4183         if (data.dsize == 0) {
4184                 *name = NULL;
4185         } else {
4186                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4187         }
4188         talloc_free(data.dptr);
4189
4190         return 0;
4191 }
4192
4193 /*
4194   set the reclock filename for a node
4195  */
4196 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4197 {
4198         int ret;
4199         TDB_DATA data;
4200         int32_t res;
4201
4202         if (reclock == NULL) {
4203                 data.dsize = 0;
4204                 data.dptr  = NULL;
4205         } else {
4206                 data.dsize = strlen(reclock) + 1;
4207                 data.dptr  = discard_const(reclock);
4208         }
4209
4210         ret = ctdb_control(ctdb, destnode, 0, 
4211                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4212                            NULL, NULL, &res, &timeout, NULL);
4213         if (ret != 0 || res != 0) {
4214                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4215                 return -1;
4216         }
4217
4218         return 0;
4219 }
4220
4221 /*
4222   stop a node
4223  */
4224 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4225 {
4226         int ret;
4227         int32_t res;
4228
4229         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4230                            ctdb, NULL, &res, &timeout, NULL);
4231         if (ret != 0 || res != 0) {
4232                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4233                 return -1;
4234         }
4235
4236         return 0;
4237 }
4238
4239 /*
4240   continue a node
4241  */
4242 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4243 {
4244         int ret;
4245
4246         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4247                            ctdb, NULL, NULL, &timeout, NULL);
4248         if (ret != 0) {
4249                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4250                 return -1;
4251         }
4252
4253         return 0;
4254 }
4255
4256 /*
4257   set the natgw state for a node
4258  */
4259 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4260 {
4261         int ret;
4262         TDB_DATA data;
4263         int32_t res;
4264
4265         data.dsize = sizeof(natgwstate);
4266         data.dptr  = (uint8_t *)&natgwstate;
4267
4268         ret = ctdb_control(ctdb, destnode, 0, 
4269                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4270                            NULL, NULL, &res, &timeout, NULL);
4271         if (ret != 0 || res != 0) {
4272                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4273                 return -1;
4274         }
4275
4276         return 0;
4277 }
4278
4279 /*
4280   set the lmaster role for a node
4281  */
4282 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4283 {
4284         int ret;
4285         TDB_DATA data;
4286         int32_t res;
4287
4288         data.dsize = sizeof(lmasterrole);
4289         data.dptr  = (uint8_t *)&lmasterrole;
4290
4291         ret = ctdb_control(ctdb, destnode, 0, 
4292                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4293                            NULL, NULL, &res, &timeout, NULL);
4294         if (ret != 0 || res != 0) {
4295                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4296                 return -1;
4297         }
4298
4299         return 0;
4300 }
4301
4302 /*
4303   set the recmaster role for a node
4304  */
4305 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4306 {
4307         int ret;
4308         TDB_DATA data;
4309         int32_t res;
4310
4311         data.dsize = sizeof(recmasterrole);
4312         data.dptr  = (uint8_t *)&recmasterrole;
4313
4314         ret = ctdb_control(ctdb, destnode, 0, 
4315                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4316                            NULL, NULL, &res, &timeout, NULL);
4317         if (ret != 0 || res != 0) {
4318                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4319                 return -1;
4320         }
4321
4322         return 0;
4323 }
4324
4325 /* enable an eventscript
4326  */
4327 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4328 {
4329         int ret;
4330         TDB_DATA data;
4331         int32_t res;
4332
4333         data.dsize = strlen(script) + 1;
4334         data.dptr  = discard_const(script);
4335
4336         ret = ctdb_control(ctdb, destnode, 0, 
4337                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4338                            NULL, NULL, &res, &timeout, NULL);
4339         if (ret != 0 || res != 0) {
4340                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4341                 return -1;
4342         }
4343
4344         return 0;
4345 }
4346
4347 /* disable an eventscript
4348  */
4349 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4350 {
4351         int ret;
4352         TDB_DATA data;
4353         int32_t res;
4354
4355         data.dsize = strlen(script) + 1;
4356         data.dptr  = discard_const(script);
4357
4358         ret = ctdb_control(ctdb, destnode, 0, 
4359                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4360                            NULL, NULL, &res, &timeout, NULL);
4361         if (ret != 0 || res != 0) {
4362                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4363                 return -1;
4364         }
4365
4366         return 0;
4367 }
4368
4369
4370 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4371 {
4372         int ret;
4373         TDB_DATA data;
4374         int32_t res;
4375
4376         data.dsize = sizeof(*bantime);
4377         data.dptr  = (uint8_t *)bantime;
4378
4379         ret = ctdb_control(ctdb, destnode, 0, 
4380                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4381                            NULL, NULL, &res, &timeout, NULL);
4382         if (ret != 0 || res != 0) {
4383                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4384                 return -1;
4385         }
4386
4387         return 0;
4388 }
4389
4390
4391 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4392 {
4393         int ret;
4394         TDB_DATA outdata;
4395         int32_t res;
4396         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4397
4398         ret = ctdb_control(ctdb, destnode, 0, 
4399                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4400                            tmp_ctx, &outdata, &res, &timeout, NULL);
4401         if (ret != 0 || res != 0) {
4402                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4403                 talloc_free(tmp_ctx);
4404                 return -1;
4405         }
4406
4407         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4408         talloc_free(tmp_ctx);
4409
4410         return 0;
4411 }
4412
4413
4414 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4415 {
4416         int ret;
4417         int32_t res;
4418         TDB_DATA data;
4419         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4420
4421         data.dptr = (uint8_t*)db_prio;
4422         data.dsize = sizeof(*db_prio);
4423
4424         ret = ctdb_control(ctdb, destnode, 0, 
4425                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4426                            tmp_ctx, NULL, &res, &timeout, NULL);
4427         if (ret != 0 || res != 0) {
4428                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4429                 talloc_free(tmp_ctx);
4430                 return -1;
4431         }
4432
4433         talloc_free(tmp_ctx);
4434
4435         return 0;
4436 }
4437
4438 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4439 {
4440         int ret;
4441         int32_t res;
4442         TDB_DATA data;
4443         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4444
4445         data.dptr = (uint8_t*)&db_id;
4446         data.dsize = sizeof(db_id);
4447
4448         ret = ctdb_control(ctdb, destnode, 0, 
4449                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4450                            tmp_ctx, NULL, &res, &timeout, NULL);
4451         if (ret != 0 || res < 0) {
4452                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4453                 talloc_free(tmp_ctx);
4454                 return -1;
4455         }
4456
4457         if (priority) {
4458                 *priority = res;
4459         }
4460
4461         talloc_free(tmp_ctx);
4462
4463         return 0;
4464 }
4465
4466 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4467 {
4468         int ret;
4469         TDB_DATA outdata;
4470         int32_t res;
4471
4472         ret = ctdb_control(ctdb, destnode, 0, 
4473                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4474                            mem_ctx, &outdata, &res, &timeout, NULL);
4475         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4476                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4477                 return -1;
4478         }
4479
4480         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4481         talloc_free(outdata.dptr);
4482                     
4483         return 0;
4484 }
4485
4486 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4487 {
4488         if (h == NULL) {
4489                 return NULL;
4490         }
4491
4492         return &h->header;
4493 }
4494
4495
4496 struct ctdb_client_control_state *
4497 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4498 {
4499         struct ctdb_client_control_state *handle;
4500         struct ctdb_marshall_buffer *m;
4501         struct ctdb_rec_data *rec;
4502         TDB_DATA outdata;
4503
4504         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4505         if (m == NULL) {
4506                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4507                 return NULL;
4508         }
4509
4510         m->db_id = ctdb_db->db_id;
4511
4512         rec = ctdb_marshall_record(m, 0, key, header, data);
4513         if (rec == NULL) {
4514                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4515                 talloc_free(m);
4516                 return NULL;
4517         }
4518         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4519         if (m == NULL) {
4520                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4521                 talloc_free(m);
4522                 return NULL;
4523         }
4524         m->count++;
4525         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4526
4527
4528         outdata.dptr = (uint8_t *)m;
4529         outdata.dsize = talloc_get_size(m);
4530
4531         handle = ctdb_control_send(ctdb, destnode, 0, 
4532                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4533                            mem_ctx, &timeout, NULL);
4534         talloc_free(m);
4535         return handle;
4536 }
4537
4538 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4539 {
4540         int ret;
4541         int32_t res;
4542
4543         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4544         if ( (ret != 0) || (res != 0) ){
4545                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4546                 return -1;
4547         }
4548
4549         return 0;
4550 }
4551
4552 int
4553 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4554 {
4555         struct ctdb_client_control_state *state;
4556
4557         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4558         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4559 }
4560
4561
4562
4563
4564
4565
4566 /*
4567   set a database to be readonly
4568  */
4569 struct ctdb_client_control_state *
4570 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4571 {
4572         TDB_DATA data;
4573
4574         data.dptr = (uint8_t *)&dbid;
4575         data.dsize = sizeof(dbid);
4576
4577         return ctdb_control_send(ctdb, destnode, 0, 
4578                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4579                            ctdb, NULL, NULL);
4580 }
4581
4582 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4583 {
4584         int ret;
4585         int32_t res;
4586
4587         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4588         if (ret != 0 || res != 0) {
4589           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4590                 return -1;
4591         }
4592
4593         return 0;
4594 }
4595
4596 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4597 {
4598         struct ctdb_client_control_state *state;
4599
4600         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4601         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4602 }
4603
4604 /*
4605   set a database to be sticky
4606  */
4607 struct ctdb_client_control_state *
4608 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4609 {
4610         TDB_DATA data;
4611
4612         data.dptr = (uint8_t *)&dbid;
4613         data.dsize = sizeof(dbid);
4614
4615         return ctdb_control_send(ctdb, destnode, 0, 
4616                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4617                            ctdb, NULL, NULL);
4618 }
4619
4620 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4621 {
4622         int ret;
4623         int32_t res;
4624
4625         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4626         if (ret != 0 || res != 0) {
4627                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4628                 return -1;
4629         }
4630
4631         return 0;
4632 }
4633
4634 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4635 {
4636         struct ctdb_client_control_state *state;
4637
4638         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4639         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4640 }