Merge branch 'master' of ctdb into 'master' of samba
[obnox/samba/samba-obnox.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/locale.h"
28 #include <stdlib.h>
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 pid_t ctdbd_pid;
33
34 /*
35   allocate a packet for use in client<->daemon communication
36  */
37 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
38                                             TALLOC_CTX *mem_ctx, 
39                                             enum ctdb_operation operation, 
40                                             size_t length, size_t slength,
41                                             const char *type)
42 {
43         int size;
44         struct ctdb_req_header *hdr;
45
46         length = MAX(length, slength);
47         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
48
49         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
50         if (hdr == NULL) {
51                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
52                          operation, (unsigned)length));
53                 return NULL;
54         }
55         talloc_set_name_const(hdr, type);
56         hdr->length       = length;
57         hdr->operation    = operation;
58         hdr->ctdb_magic   = CTDB_MAGIC;
59         hdr->ctdb_version = CTDB_VERSION;
60         hdr->srcnode      = ctdb->pnn;
61         if (ctdb->vnn_map) {
62                 hdr->generation = ctdb->vnn_map->generation;
63         }
64
65         return hdr;
66 }
67
68 /*
69   local version of ctdb_call
70 */
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73                     TDB_DATA *data, bool updatetdb)
74 {
75         struct ctdb_call_info *c;
76         struct ctdb_registered_call *fn;
77         struct ctdb_context *ctdb = ctdb_db->ctdb;
78         
79         c = talloc(ctdb, struct ctdb_call_info);
80         CTDB_NO_MEMORY(ctdb, c);
81
82         c->key = call->key;
83         c->call_data = &call->call_data;
84         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85         c->record_data.dsize = data->dsize;
86         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
87         c->new_data = NULL;
88         c->reply_data = NULL;
89         c->status = 0;
90         c->header = header;
91
92         for (fn=ctdb_db->calls;fn;fn=fn->next) {
93                 if (fn->id == call->call_id) break;
94         }
95         if (fn == NULL) {
96                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
97                 talloc_free(c);
98                 return -1;
99         }
100
101         if (fn->fn(c) != 0) {
102                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
103                 talloc_free(c);
104                 return -1;
105         }
106
107         /* we need to force the record to be written out if this was a remote access */
108         if (c->new_data == NULL) {
109                 c->new_data = &c->record_data;
110         }
111
112         if (c->new_data && updatetdb) {
113                 /* XXX check that we always have the lock here? */
114                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
115                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
116                         talloc_free(c);
117                         return -1;
118                 }
119         }
120
121         if (c->reply_data) {
122                 call->reply_data = *c->reply_data;
123
124                 talloc_steal(call, call->reply_data.dptr);
125                 talloc_set_name_const(call->reply_data.dptr, __location__);
126         } else {
127                 call->reply_data.dptr = NULL;
128                 call->reply_data.dsize = 0;
129         }
130         call->status = c->status;
131
132         talloc_free(c);
133
134         return 0;
135 }
136
137
138 /*
139   queue a packet for sending from client to daemon
140 */
141 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
142 {
143         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
144 }
145
146
147 /*
148   called when a CTDB_REPLY_CALL packet comes in in the client
149
150   This packet comes in response to a CTDB_REQ_CALL request packet. It
151   contains any reply data from the call
152 */
153 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
154 {
155         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
156         struct ctdb_client_call_state *state;
157
158         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
159         if (state == NULL) {
160                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
161                 return;
162         }
163
164         if (hdr->reqid != state->reqid) {
165                 /* we found a record  but it was the wrong one */
166                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
167                 return;
168         }
169
170         state->call->reply_data.dptr = c->data;
171         state->call->reply_data.dsize = c->datalen;
172         state->call->status = c->status;
173
174         talloc_steal(state, c);
175
176         state->state = CTDB_CALL_DONE;
177
178         if (state->async.fn) {
179                 state->async.fn(state);
180         }
181 }
182
183 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
184
185 /*
186   this is called in the client, when data comes in from the daemon
187  */
188 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
189 {
190         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
191         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
192         TALLOC_CTX *tmp_ctx;
193
194         /* place the packet as a child of a tmp_ctx. We then use
195            talloc_free() below to free it. If any of the calls want
196            to keep it, then they will steal it somewhere else, and the
197            talloc_free() will be a no-op */
198         tmp_ctx = talloc_new(ctdb);
199         talloc_steal(tmp_ctx, hdr);
200
201         if (cnt == 0) {
202                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
203                 exit(1);
204         }
205
206         if (cnt < sizeof(*hdr)) {
207                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
208                 goto done;
209         }
210         if (cnt != hdr->length) {
211                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
212                                (unsigned)hdr->length, (unsigned)cnt);
213                 goto done;
214         }
215
216         if (hdr->ctdb_magic != CTDB_MAGIC) {
217                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
218                 goto done;
219         }
220
221         if (hdr->ctdb_version != CTDB_VERSION) {
222                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
223                 goto done;
224         }
225
226         switch (hdr->operation) {
227         case CTDB_REPLY_CALL:
228                 ctdb_client_reply_call(ctdb, hdr);
229                 break;
230
231         case CTDB_REQ_MESSAGE:
232                 ctdb_request_message(ctdb, hdr);
233                 break;
234
235         case CTDB_REPLY_CONTROL:
236                 ctdb_client_reply_control(ctdb, hdr);
237                 break;
238
239         default:
240                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
241         }
242
243 done:
244         talloc_free(tmp_ctx);
245 }
246
247 /*
248   connect to a unix domain socket
249 */
250 int ctdb_socket_connect(struct ctdb_context *ctdb)
251 {
252         struct sockaddr_un addr;
253
254         memset(&addr, 0, sizeof(addr));
255         addr.sun_family = AF_UNIX;
256         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
257
258         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
259         if (ctdb->daemon.sd == -1) {
260                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
261                 return -1;
262         }
263
264         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
265                 close(ctdb->daemon.sd);
266                 ctdb->daemon.sd = -1;
267                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
268                 return -1;
269         }
270
271         set_nonblocking(ctdb->daemon.sd);
272         set_close_on_exec(ctdb->daemon.sd);
273         
274         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
275                                               CTDB_DS_ALIGNMENT, 
276                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
277         return 0;
278 }
279
280
281 struct ctdb_record_handle {
282         struct ctdb_db_context *ctdb_db;
283         TDB_DATA key;
284         TDB_DATA *data;
285         struct ctdb_ltdb_header header;
286 };
287
288
289 /*
290   make a recv call to the local ctdb daemon - called from client context
291
292   This is called when the program wants to wait for a ctdb_call to complete and get the 
293   results. This call will block unless the call has already completed.
294 */
295 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
296 {
297         if (state == NULL) {
298                 return -1;
299         }
300
301         while (state->state < CTDB_CALL_DONE) {
302                 event_loop_once(state->ctdb_db->ctdb->ev);
303         }
304         if (state->state != CTDB_CALL_DONE) {
305                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
306                 talloc_free(state);
307                 return -1;
308         }
309
310         if (state->call->reply_data.dsize) {
311                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
312                                                       state->call->reply_data.dptr,
313                                                       state->call->reply_data.dsize);
314                 call->reply_data.dsize = state->call->reply_data.dsize;
315         } else {
316                 call->reply_data.dptr = NULL;
317                 call->reply_data.dsize = 0;
318         }
319         call->status = state->call->status;
320         talloc_free(state);
321
322         return call->status;
323 }
324
325
326
327
328 /*
329   destroy a ctdb_call in client
330 */
331 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
332 {
333         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
334         return 0;
335 }
336
337 /*
338   construct an event driven local ctdb_call
339
340   this is used so that locally processed ctdb_call requests are processed
341   in an event driven manner
342 */
343 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
344                                                                   struct ctdb_call *call,
345                                                                   struct ctdb_ltdb_header *header,
346                                                                   TDB_DATA *data)
347 {
348         struct ctdb_client_call_state *state;
349         struct ctdb_context *ctdb = ctdb_db->ctdb;
350         int ret;
351
352         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
353         CTDB_NO_MEMORY_NULL(ctdb, state);
354         state->call = talloc_zero(state, struct ctdb_call);
355         CTDB_NO_MEMORY_NULL(ctdb, state->call);
356
357         talloc_steal(state, data->dptr);
358
359         state->state   = CTDB_CALL_DONE;
360         *(state->call) = *call;
361         state->ctdb_db = ctdb_db;
362
363         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
364         if (ret != 0) {
365                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
366         }
367
368         return state;
369 }
370
371 /*
372   make a ctdb call to the local daemon - async send. Called from client context.
373
374   This constructs a ctdb_call request and queues it for processing. 
375   This call never blocks.
376 */
377 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
378                                               struct ctdb_call *call)
379 {
380         struct ctdb_client_call_state *state;
381         struct ctdb_context *ctdb = ctdb_db->ctdb;
382         struct ctdb_ltdb_header header;
383         TDB_DATA data;
384         int ret;
385         size_t len;
386         struct ctdb_req_call *c;
387
388         /* if the domain socket is not yet open, open it */
389         if (ctdb->daemon.sd==-1) {
390                 ctdb_socket_connect(ctdb);
391         }
392
393         ret = ctdb_ltdb_lock(ctdb_db, call->key);
394         if (ret != 0) {
395                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
396                 return NULL;
397         }
398
399         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
400
401         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
402                 ret = -1;
403         }
404
405         if (ret == 0 && header.dmaster == ctdb->pnn) {
406                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
407                 talloc_free(data.dptr);
408                 ctdb_ltdb_unlock(ctdb_db, call->key);
409                 return state;
410         }
411
412         ctdb_ltdb_unlock(ctdb_db, call->key);
413         talloc_free(data.dptr);
414
415         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
416         if (state == NULL) {
417                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
418                 return NULL;
419         }
420         state->call = talloc_zero(state, struct ctdb_call);
421         if (state->call == NULL) {
422                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
423                 return NULL;
424         }
425
426         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
427         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
428         if (c == NULL) {
429                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
430                 return NULL;
431         }
432
433         state->reqid     = ctdb_reqid_new(ctdb, state);
434         state->ctdb_db = ctdb_db;
435         talloc_set_destructor(state, ctdb_client_call_destructor);
436
437         c->hdr.reqid     = state->reqid;
438         c->flags         = call->flags;
439         c->db_id         = ctdb_db->db_id;
440         c->callid        = call->call_id;
441         c->hopcount      = 0;
442         c->keylen        = call->key.dsize;
443         c->calldatalen   = call->call_data.dsize;
444         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
445         memcpy(&c->data[call->key.dsize], 
446                call->call_data.dptr, call->call_data.dsize);
447         *(state->call)              = *call;
448         state->call->call_data.dptr = &c->data[call->key.dsize];
449         state->call->key.dptr       = &c->data[0];
450
451         state->state  = CTDB_CALL_WAIT;
452
453
454         ctdb_client_queue_pkt(ctdb, &c->hdr);
455
456         return state;
457 }
458
459
460 /*
461   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
462 */
463 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
464 {
465         struct ctdb_client_call_state *state;
466
467         state = ctdb_call_send(ctdb_db, call);
468         return ctdb_call_recv(state, call);
469 }
470
471
472 /*
473   tell the daemon what messaging srvid we will use, and register the message
474   handler function in the client
475 */
476 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
477                              ctdb_msg_fn_t handler,
478                              void *private_data)
479 {
480         int res;
481         int32_t status;
482
483         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
484                            tdb_null, NULL, NULL, &status, NULL, NULL);
485         if (res != 0 || status != 0) {
486                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
487                 return -1;
488         }
489
490         /* also need to register the handler with our own ctdb structure */
491         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
492 }
493
494 /*
495   tell the daemon we no longer want a srvid
496 */
497 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
498 {
499         int res;
500         int32_t status;
501
502         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
503                            tdb_null, NULL, NULL, &status, NULL, NULL);
504         if (res != 0 || status != 0) {
505                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
506                 return -1;
507         }
508
509         /* also need to register the handler with our own ctdb structure */
510         ctdb_deregister_message_handler(ctdb, srvid, private_data);
511         return 0;
512 }
513
514 /*
515  * check server ids
516  */
517 int ctdb_client_check_message_handlers(struct ctdb_context *ctdb, uint64_t *ids, uint32_t num,
518                                        uint8_t *result)
519 {
520         TDB_DATA indata, outdata;
521         int res;
522         int32_t status;
523         int i;
524
525         indata.dptr = (uint8_t *)ids;
526         indata.dsize = num * sizeof(*ids);
527
528         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CHECK_SRVIDS, 0,
529                            indata, ctdb, &outdata, &status, NULL, NULL);
530         if (res != 0 || status != 0) {
531                 DEBUG(DEBUG_ERR, (__location__ " failed to check srvids\n"));
532                 return -1;
533         }
534
535         if (outdata.dsize != num*sizeof(uint8_t)) {
536                 DEBUG(DEBUG_ERR, (__location__ " expected %lu bytes, received %zi bytes\n",
537                                   (long unsigned int)num*sizeof(uint8_t),
538                                   outdata.dsize));
539                 talloc_free(outdata.dptr);
540                 return -1;
541         }
542
543         for (i=0; i<num; i++) {
544                 result[i] = outdata.dptr[i];
545         }
546
547         talloc_free(outdata.dptr);
548         return 0;
549 }
550
551 /*
552   send a message - from client context
553  */
554 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
555                       uint64_t srvid, TDB_DATA data)
556 {
557         struct ctdb_req_message *r;
558         int len, res;
559
560         len = offsetof(struct ctdb_req_message, data) + data.dsize;
561         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
562                                len, struct ctdb_req_message);
563         CTDB_NO_MEMORY(ctdb, r);
564
565         r->hdr.destnode  = pnn;
566         r->srvid         = srvid;
567         r->datalen       = data.dsize;
568         memcpy(&r->data[0], data.dptr, data.dsize);
569         
570         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
571         talloc_free(r);
572         return res;
573 }
574
575
576 /*
577   cancel a ctdb_fetch_lock operation, releasing the lock
578  */
579 static int fetch_lock_destructor(struct ctdb_record_handle *h)
580 {
581         ctdb_ltdb_unlock(h->ctdb_db, h->key);
582         return 0;
583 }
584
585 /*
586   force the migration of a record to this node
587  */
588 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
589 {
590         struct ctdb_call call;
591         ZERO_STRUCT(call);
592         call.call_id = CTDB_NULL_FUNC;
593         call.key = key;
594         call.flags = CTDB_IMMEDIATE_MIGRATION;
595         return ctdb_call(ctdb_db, &call);
596 }
597
598 /*
599   try to fetch a readonly copy of a record
600  */
601 static int
602 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
603 {
604         int ret;
605
606         struct ctdb_call call;
607         ZERO_STRUCT(call);
608
609         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
610         call.call_data.dptr = NULL;
611         call.call_data.dsize = 0;
612         call.key = key;
613         call.flags = CTDB_WANT_READONLY;
614         ret = ctdb_call(ctdb_db, &call);
615
616         if (ret != 0) {
617                 return -1;
618         }
619         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
620                 return -1;
621         }
622
623         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
624         if (*hdr == NULL) {
625                 talloc_free(call.reply_data.dptr);
626                 return -1;
627         }
628
629         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
630         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
631         if (data->dptr == NULL) {
632                 talloc_free(call.reply_data.dptr);
633                 talloc_free(hdr);
634                 return -1;
635         }
636
637         return 0;
638 }
639
640 /*
641   get a lock on a record, and return the records data. Blocks until it gets the lock
642  */
643 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
644                                            TDB_DATA key, TDB_DATA *data)
645 {
646         int ret;
647         struct ctdb_record_handle *h;
648
649         /*
650           procedure is as follows:
651
652           1) get the chain lock. 
653           2) check if we are dmaster
654           3) if we are the dmaster then return handle 
655           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
656              reply from ctdbd
657           5) when we get the reply, goto (1)
658          */
659
660         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
661         if (h == NULL) {
662                 return NULL;
663         }
664
665         h->ctdb_db = ctdb_db;
666         h->key     = key;
667         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
668         if (h->key.dptr == NULL) {
669                 talloc_free(h);
670                 return NULL;
671         }
672         h->data    = data;
673
674         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
675                  (const char *)key.dptr));
676
677 again:
678         /* step 1 - get the chain lock */
679         ret = ctdb_ltdb_lock(ctdb_db, key);
680         if (ret != 0) {
681                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
682                 talloc_free(h);
683                 return NULL;
684         }
685
686         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
687
688         talloc_set_destructor(h, fetch_lock_destructor);
689
690         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
691
692         /* when torturing, ensure we test the remote path */
693         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
694             random() % 5 == 0) {
695                 h->header.dmaster = (uint32_t)-1;
696         }
697
698
699         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
700
701         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
702                 ctdb_ltdb_unlock(ctdb_db, key);
703                 ret = ctdb_client_force_migration(ctdb_db, key);
704                 if (ret != 0) {
705                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
706                         talloc_free(h);
707                         return NULL;
708                 }
709                 goto again;
710         }
711
712         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
713         return h;
714 }
715
716 /*
717   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
718  */
719 struct ctdb_record_handle *
720 ctdb_fetch_readonly_lock(
721         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
722         TDB_DATA key, TDB_DATA *data,
723         int read_only)
724 {
725         int ret;
726         struct ctdb_record_handle *h;
727         struct ctdb_ltdb_header *roheader = NULL;
728
729         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
730         if (h == NULL) {
731                 return NULL;
732         }
733
734         h->ctdb_db = ctdb_db;
735         h->key     = key;
736         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
737         if (h->key.dptr == NULL) {
738                 talloc_free(h);
739                 return NULL;
740         }
741         h->data    = data;
742
743         data->dptr = NULL;
744         data->dsize = 0;
745
746
747 again:
748         talloc_free(roheader);
749         roheader = NULL;
750
751         talloc_free(data->dptr);
752         data->dptr = NULL;
753         data->dsize = 0;
754
755         /* Lock the record/chain */
756         ret = ctdb_ltdb_lock(ctdb_db, key);
757         if (ret != 0) {
758                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
759                 talloc_free(h);
760                 return NULL;
761         }
762
763         talloc_set_destructor(h, fetch_lock_destructor);
764
765         /* Check if record exists yet in the TDB */
766         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
767         if (ret != 0) {
768                 ctdb_ltdb_unlock(ctdb_db, key);
769                 ret = ctdb_client_force_migration(ctdb_db, key);
770                 if (ret != 0) {
771                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
772                         talloc_free(h);
773                         return NULL;
774                 }
775                 goto again;
776         }
777
778         /* if this is a request for read/write and we have delegations
779            we have to revoke all delegations first
780         */
781         if ((read_only == 0) 
782         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
783         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
784                 ctdb_ltdb_unlock(ctdb_db, key);
785                 ret = ctdb_client_force_migration(ctdb_db, key);
786                 if (ret != 0) {
787                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
788                         talloc_free(h);
789                         return NULL;
790                 }
791                 goto again;
792         }
793
794         /* if we are dmaster, just return the handle */
795         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
796                 return h;
797         }
798
799         if (read_only != 0) {
800                 TDB_DATA rodata = {NULL, 0};
801
802                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
803                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
804                         return h;
805                 }
806
807                 ctdb_ltdb_unlock(ctdb_db, key);
808                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
809                 if (ret != 0) {
810                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
811                         ret = ctdb_client_force_migration(ctdb_db, key);
812                         if (ret != 0) {
813                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
814                                 talloc_free(h);
815                                 return NULL;
816                         }
817
818                         goto again;
819                 }
820
821                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
822                         ret = ctdb_client_force_migration(ctdb_db, key);
823                         if (ret != 0) {
824                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
825                                 talloc_free(h);
826                                 return NULL;
827                         }
828
829                         goto again;
830                 }
831
832                 ret = ctdb_ltdb_lock(ctdb_db, key);
833                 if (ret != 0) {
834                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
835                         talloc_free(h);
836                         return NULL;
837                 }
838
839                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
840                 if (ret != 0) {
841                         ctdb_ltdb_unlock(ctdb_db, key);
842
843                         ret = ctdb_client_force_migration(ctdb_db, key);
844                         if (ret != 0) {
845                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
846                                 talloc_free(h);
847                                 return NULL;
848                         }
849
850                         goto again;
851                 }
852
853                 return h;
854         }
855
856         /* we are not dmaster and this was not a request for a readonly lock
857          * so unlock the record, migrate it and try again
858          */
859         ctdb_ltdb_unlock(ctdb_db, key);
860         ret = ctdb_client_force_migration(ctdb_db, key);
861         if (ret != 0) {
862                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
863                 talloc_free(h);
864                 return NULL;
865         }
866         goto again;
867 }
868
869 /*
870   store some data to the record that was locked with ctdb_fetch_lock()
871 */
872 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
873 {
874         if (h->ctdb_db->persistent) {
875                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
876                 return -1;
877         }
878
879         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
880 }
881
882 /*
883   non-locking fetch of a record
884  */
885 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
886                TDB_DATA key, TDB_DATA *data)
887 {
888         struct ctdb_call call;
889         int ret;
890
891         call.call_id = CTDB_FETCH_FUNC;
892         call.call_data.dptr = NULL;
893         call.call_data.dsize = 0;
894         call.key = key;
895
896         ret = ctdb_call(ctdb_db, &call);
897
898         if (ret == 0) {
899                 *data = call.reply_data;
900                 talloc_steal(mem_ctx, data->dptr);
901         }
902
903         return ret;
904 }
905
906
907
908 /*
909    called when a control completes or timesout to invoke the callback
910    function the user provided
911 */
912 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
913         struct timeval t, void *private_data)
914 {
915         struct ctdb_client_control_state *state;
916         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
917         int ret;
918
919         state = talloc_get_type(private_data, struct ctdb_client_control_state);
920         talloc_steal(tmp_ctx, state);
921
922         ret = ctdb_control_recv(state->ctdb, state, state,
923                         NULL, 
924                         NULL, 
925                         NULL);
926         if (ret != 0) {
927                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
928         }
929
930         talloc_free(tmp_ctx);
931 }
932
933 /*
934   called when a CTDB_REPLY_CONTROL packet comes in in the client
935
936   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
937   contains any reply data from the control
938 */
939 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
940                                       struct ctdb_req_header *hdr)
941 {
942         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
943         struct ctdb_client_control_state *state;
944
945         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
946         if (state == NULL) {
947                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
948                 return;
949         }
950
951         if (hdr->reqid != state->reqid) {
952                 /* we found a record  but it was the wrong one */
953                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
954                 return;
955         }
956
957         state->outdata.dptr = c->data;
958         state->outdata.dsize = c->datalen;
959         state->status = c->status;
960         if (c->errorlen) {
961                 state->errormsg = talloc_strndup(state, 
962                                                  (char *)&c->data[c->datalen], 
963                                                  c->errorlen);
964         }
965
966         /* state->outdata now uses resources from c so we dont want c
967            to just dissappear from under us while state is still alive
968         */
969         talloc_steal(state, c);
970
971         state->state = CTDB_CONTROL_DONE;
972
973         /* if we had a callback registered for this control, pull the response
974            and call the callback.
975         */
976         if (state->async.fn) {
977                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
978         }
979 }
980
981
982 /*
983   destroy a ctdb_control in client
984 */
985 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
986 {
987         ctdb_reqid_remove(state->ctdb, state->reqid);
988         return 0;
989 }
990
991
992 /* time out handler for ctdb_control */
993 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
994         struct timeval t, void *private_data)
995 {
996         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
997
998         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
999                          "dstnode:%u\n", state->reqid, state->c->opcode,
1000                          state->c->hdr.destnode));
1001
1002         state->state = CTDB_CONTROL_TIMEOUT;
1003
1004         /* if we had a callback registered for this control, pull the response
1005            and call the callback.
1006         */
1007         if (state->async.fn) {
1008                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1009         }
1010 }
1011
1012 /* async version of send control request */
1013 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1014                 uint32_t destnode, uint64_t srvid, 
1015                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1016                 TALLOC_CTX *mem_ctx,
1017                 struct timeval *timeout,
1018                 char **errormsg)
1019 {
1020         struct ctdb_client_control_state *state;
1021         size_t len;
1022         struct ctdb_req_control *c;
1023         int ret;
1024
1025         if (errormsg) {
1026                 *errormsg = NULL;
1027         }
1028
1029         /* if the domain socket is not yet open, open it */
1030         if (ctdb->daemon.sd==-1) {
1031                 ctdb_socket_connect(ctdb);
1032         }
1033
1034         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1035         CTDB_NO_MEMORY_NULL(ctdb, state);
1036
1037         state->ctdb       = ctdb;
1038         state->reqid      = ctdb_reqid_new(ctdb, state);
1039         state->state      = CTDB_CONTROL_WAIT;
1040         state->errormsg   = NULL;
1041
1042         talloc_set_destructor(state, ctdb_client_control_destructor);
1043
1044         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1045         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1046                                len, struct ctdb_req_control);
1047         state->c            = c;        
1048         CTDB_NO_MEMORY_NULL(ctdb, c);
1049         c->hdr.reqid        = state->reqid;
1050         c->hdr.destnode     = destnode;
1051         c->opcode           = opcode;
1052         c->client_id        = 0;
1053         c->flags            = flags;
1054         c->srvid            = srvid;
1055         c->datalen          = data.dsize;
1056         if (data.dsize) {
1057                 memcpy(&c->data[0], data.dptr, data.dsize);
1058         }
1059
1060         /* timeout */
1061         if (timeout && !timeval_is_zero(timeout)) {
1062                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1063         }
1064
1065         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1066         if (ret != 0) {
1067                 talloc_free(state);
1068                 return NULL;
1069         }
1070
1071         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1072                 talloc_free(state);
1073                 return NULL;
1074         }
1075
1076         return state;
1077 }
1078
1079
1080 /* async version of receive control reply */
1081 int ctdb_control_recv(struct ctdb_context *ctdb, 
1082                 struct ctdb_client_control_state *state, 
1083                 TALLOC_CTX *mem_ctx,
1084                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1085 {
1086         TALLOC_CTX *tmp_ctx;
1087
1088         if (status != NULL) {
1089                 *status = -1;
1090         }
1091         if (errormsg != NULL) {
1092                 *errormsg = NULL;
1093         }
1094
1095         if (state == NULL) {
1096                 return -1;
1097         }
1098
1099         /* prevent double free of state */
1100         tmp_ctx = talloc_new(ctdb);
1101         talloc_steal(tmp_ctx, state);
1102
1103         /* loop one event at a time until we either timeout or the control
1104            completes.
1105         */
1106         while (state->state == CTDB_CONTROL_WAIT) {
1107                 event_loop_once(ctdb->ev);
1108         }
1109
1110         if (state->state != CTDB_CONTROL_DONE) {
1111                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1112                 if (state->async.fn) {
1113                         state->async.fn(state);
1114                 }
1115                 talloc_free(tmp_ctx);
1116                 return -1;
1117         }
1118
1119         if (state->errormsg) {
1120                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1121                 if (errormsg) {
1122                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1123                 }
1124                 if (state->async.fn) {
1125                         state->async.fn(state);
1126                 }
1127                 talloc_free(tmp_ctx);
1128                 return -1;
1129         }
1130
1131         if (outdata) {
1132                 *outdata = state->outdata;
1133                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1134         }
1135
1136         if (status) {
1137                 *status = state->status;
1138         }
1139
1140         if (state->async.fn) {
1141                 state->async.fn(state);
1142         }
1143
1144         talloc_free(tmp_ctx);
1145         return 0;
1146 }
1147
1148
1149
1150 /*
1151   send a ctdb control message
1152   timeout specifies how long we should wait for a reply.
1153   if timeout is NULL we wait indefinitely
1154  */
1155 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1156                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1157                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1158                  struct timeval *timeout,
1159                  char **errormsg)
1160 {
1161         struct ctdb_client_control_state *state;
1162
1163         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1164                         flags, data, mem_ctx,
1165                         timeout, errormsg);
1166
1167         /* FIXME: Error conditions in ctdb_control_send return NULL without
1168          * setting errormsg.  So, there is no way to distinguish between sucess
1169          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1170         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1171                 if (status != NULL) {
1172                         *status = 0;
1173                 }
1174                 return 0;
1175         }
1176
1177         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1178                         errormsg);
1179 }
1180
1181
1182
1183
1184 /*
1185   a process exists call. Returns 0 if process exists, -1 otherwise
1186  */
1187 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1188 {
1189         int ret;
1190         TDB_DATA data;
1191         int32_t status;
1192
1193         data.dptr = (uint8_t*)&pid;
1194         data.dsize = sizeof(pid);
1195
1196         ret = ctdb_control(ctdb, destnode, 0, 
1197                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1198                            NULL, NULL, &status, NULL, NULL);
1199         if (ret != 0) {
1200                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1201                 return -1;
1202         }
1203
1204         return status;
1205 }
1206
1207 /*
1208   get remote statistics
1209  */
1210 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1211 {
1212         int ret;
1213         TDB_DATA data;
1214         int32_t res;
1215
1216         ret = ctdb_control(ctdb, destnode, 0, 
1217                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1218                            ctdb, &data, &res, NULL, NULL);
1219         if (ret != 0 || res != 0) {
1220                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1221                 return -1;
1222         }
1223
1224         if (data.dsize != sizeof(struct ctdb_statistics)) {
1225                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1226                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1227                       return -1;
1228         }
1229
1230         *status = *(struct ctdb_statistics *)data.dptr;
1231         talloc_free(data.dptr);
1232                         
1233         return 0;
1234 }
1235
1236 /*
1237  * get db statistics
1238  */
1239 int ctdb_ctrl_dbstatistics(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1240                            TALLOC_CTX *mem_ctx, struct ctdb_db_statistics **dbstat)
1241 {
1242         int ret;
1243         TDB_DATA indata, outdata;
1244         int32_t res;
1245         struct ctdb_db_statistics *wire, *s;
1246         char *ptr;
1247         int i;
1248
1249         indata.dptr = (uint8_t *)&dbid;
1250         indata.dsize = sizeof(dbid);
1251
1252         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_STATISTICS,
1253                            0, indata, ctdb, &outdata, &res, NULL, NULL);
1254         if (ret != 0 || res != 0) {
1255                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for dbstatistics failed\n"));
1256                 return -1;
1257         }
1258
1259         if (outdata.dsize < offsetof(struct ctdb_db_statistics, hot_keys_wire)) {
1260                 DEBUG(DEBUG_ERR,(__location__ " Wrong dbstatistics size %zi - expected >= %lu\n",
1261                                  outdata.dsize,
1262                                  (long unsigned int)sizeof(struct ctdb_statistics)));
1263                 return -1;
1264         }
1265
1266         s = talloc_zero(mem_ctx, struct ctdb_db_statistics);
1267         if (s == NULL) {
1268                 talloc_free(outdata.dptr);
1269                 CTDB_NO_MEMORY(ctdb, s);
1270         }
1271
1272         wire = (struct ctdb_db_statistics *)outdata.dptr;
1273         *s = *wire;
1274         ptr = &wire->hot_keys_wire[0];
1275         for (i=0; i<wire->num_hot_keys; i++) {
1276                 s->hot_keys[i].key.dptr = talloc_size(mem_ctx, s->hot_keys[i].key.dsize);
1277                 if (s->hot_keys[i].key.dptr == NULL) {
1278                         talloc_free(outdata.dptr);
1279                         CTDB_NO_MEMORY(ctdb, s->hot_keys[i].key.dptr);
1280                 }
1281
1282                 memcpy(s->hot_keys[i].key.dptr, ptr, s->hot_keys[i].key.dsize);
1283                 ptr += wire->hot_keys[i].key.dsize;
1284         }
1285
1286         talloc_free(outdata.dptr);
1287         *dbstat = s;
1288         return 0;
1289 }
1290
1291 /*
1292   shutdown a remote ctdb node
1293  */
1294 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1295 {
1296         struct ctdb_client_control_state *state;
1297
1298         state = ctdb_control_send(ctdb, destnode, 0, 
1299                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1300                            NULL, &timeout, NULL);
1301         if (state == NULL) {
1302                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1303                 return -1;
1304         }
1305
1306         return 0;
1307 }
1308
1309 /*
1310   get vnn map from a remote node
1311  */
1312 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1313 {
1314         int ret;
1315         TDB_DATA outdata;
1316         int32_t res;
1317         struct ctdb_vnn_map_wire *map;
1318
1319         ret = ctdb_control(ctdb, destnode, 0, 
1320                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1321                            mem_ctx, &outdata, &res, &timeout, NULL);
1322         if (ret != 0 || res != 0) {
1323                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1324                 return -1;
1325         }
1326         
1327         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1328         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1329             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1330                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1331                 return -1;
1332         }
1333
1334         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1335         CTDB_NO_MEMORY(ctdb, *vnnmap);
1336         (*vnnmap)->generation = map->generation;
1337         (*vnnmap)->size       = map->size;
1338         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1339
1340         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1341         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1342         talloc_free(outdata.dptr);
1343                     
1344         return 0;
1345 }
1346
1347
1348 /*
1349   get the recovery mode of a remote node
1350  */
1351 struct ctdb_client_control_state *
1352 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1353 {
1354         return ctdb_control_send(ctdb, destnode, 0, 
1355                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1356                            mem_ctx, &timeout, NULL);
1357 }
1358
1359 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1360 {
1361         int ret;
1362         int32_t res;
1363
1364         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1365         if (ret != 0) {
1366                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1367                 return -1;
1368         }
1369
1370         if (recmode) {
1371                 *recmode = (uint32_t)res;
1372         }
1373
1374         return 0;
1375 }
1376
1377 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1378 {
1379         struct ctdb_client_control_state *state;
1380
1381         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1382         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1383 }
1384
1385
1386
1387
1388 /*
1389   set the recovery mode of a remote node
1390  */
1391 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1392 {
1393         int ret;
1394         TDB_DATA data;
1395         int32_t res;
1396
1397         data.dsize = sizeof(uint32_t);
1398         data.dptr = (unsigned char *)&recmode;
1399
1400         ret = ctdb_control(ctdb, destnode, 0, 
1401                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1402                            NULL, NULL, &res, &timeout, NULL);
1403         if (ret != 0 || res != 0) {
1404                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1405                 return -1;
1406         }
1407
1408         return 0;
1409 }
1410
1411
1412
1413 /*
1414   get the recovery master of a remote node
1415  */
1416 struct ctdb_client_control_state *
1417 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1418                         struct timeval timeout, uint32_t destnode)
1419 {
1420         return ctdb_control_send(ctdb, destnode, 0, 
1421                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1422                            mem_ctx, &timeout, NULL);
1423 }
1424
1425 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1426 {
1427         int ret;
1428         int32_t res;
1429
1430         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1431         if (ret != 0) {
1432                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1433                 return -1;
1434         }
1435
1436         if (recmaster) {
1437                 *recmaster = (uint32_t)res;
1438         }
1439
1440         return 0;
1441 }
1442
1443 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1444 {
1445         struct ctdb_client_control_state *state;
1446
1447         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1448         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1449 }
1450
1451
1452 /*
1453   set the recovery master of a remote node
1454  */
1455 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1456 {
1457         int ret;
1458         TDB_DATA data;
1459         int32_t res;
1460
1461         ZERO_STRUCT(data);
1462         data.dsize = sizeof(uint32_t);
1463         data.dptr = (unsigned char *)&recmaster;
1464
1465         ret = ctdb_control(ctdb, destnode, 0, 
1466                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1467                            NULL, NULL, &res, &timeout, NULL);
1468         if (ret != 0 || res != 0) {
1469                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1470                 return -1;
1471         }
1472
1473         return 0;
1474 }
1475
1476
1477 /*
1478   get a list of databases off a remote node
1479  */
1480 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1481                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1482 {
1483         int ret;
1484         TDB_DATA outdata;
1485         int32_t res;
1486
1487         ret = ctdb_control(ctdb, destnode, 0, 
1488                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1489                            mem_ctx, &outdata, &res, &timeout, NULL);
1490         if (ret != 0 || res != 0) {
1491                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1492                 return -1;
1493         }
1494
1495         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1496         talloc_free(outdata.dptr);
1497                     
1498         return 0;
1499 }
1500
1501 /*
1502   get a list of nodes (vnn and flags ) from a remote node
1503  */
1504 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1505                 struct timeval timeout, uint32_t destnode, 
1506                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1507 {
1508         int ret;
1509         TDB_DATA outdata;
1510         int32_t res;
1511
1512         ret = ctdb_control(ctdb, destnode, 0, 
1513                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1514                            mem_ctx, &outdata, &res, &timeout, NULL);
1515         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1516                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1517                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1518         }
1519         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1520                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1521                 return -1;
1522         }
1523
1524         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1525         talloc_free(outdata.dptr);
1526                     
1527         return 0;
1528 }
1529
1530 /*
1531   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1532  */
1533 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1534                 struct timeval timeout, uint32_t destnode, 
1535                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1536 {
1537         int ret, i, len;
1538         TDB_DATA outdata;
1539         struct ctdb_node_mapv4 *nodemapv4;
1540         int32_t res;
1541
1542         ret = ctdb_control(ctdb, destnode, 0, 
1543                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1544                            mem_ctx, &outdata, &res, &timeout, NULL);
1545         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1546                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1547                 return -1;
1548         }
1549
1550         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1551
1552         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1553         (*nodemap) = talloc_zero_size(mem_ctx, len);
1554         CTDB_NO_MEMORY(ctdb, (*nodemap));
1555
1556         (*nodemap)->num = nodemapv4->num;
1557         for (i=0; i<nodemapv4->num; i++) {
1558                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1559                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1560                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1561                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1562         }
1563                 
1564         talloc_free(outdata.dptr);
1565                     
1566         return 0;
1567 }
1568
1569 /*
1570   drop the transport, reload the nodes file and restart the transport
1571  */
1572 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1573                     struct timeval timeout, uint32_t destnode)
1574 {
1575         int ret;
1576         int32_t res;
1577
1578         ret = ctdb_control(ctdb, destnode, 0, 
1579                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1580                            NULL, NULL, &res, &timeout, NULL);
1581         if (ret != 0 || res != 0) {
1582                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1583                 return -1;
1584         }
1585
1586         return 0;
1587 }
1588
1589
1590 /*
1591   set vnn map on a node
1592  */
1593 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1594                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1595 {
1596         int ret;
1597         TDB_DATA data;
1598         int32_t res;
1599         struct ctdb_vnn_map_wire *map;
1600         size_t len;
1601
1602         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1603         map = talloc_size(mem_ctx, len);
1604         CTDB_NO_MEMORY(ctdb, map);
1605
1606         map->generation = vnnmap->generation;
1607         map->size = vnnmap->size;
1608         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1609         
1610         data.dsize = len;
1611         data.dptr  = (uint8_t *)map;
1612
1613         ret = ctdb_control(ctdb, destnode, 0, 
1614                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1615                            NULL, NULL, &res, &timeout, NULL);
1616         if (ret != 0 || res != 0) {
1617                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1618                 return -1;
1619         }
1620
1621         talloc_free(map);
1622
1623         return 0;
1624 }
1625
1626
1627 /*
1628   async send for pull database
1629  */
1630 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1631         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1632         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1633 {
1634         TDB_DATA indata;
1635         struct ctdb_control_pulldb *pull;
1636         struct ctdb_client_control_state *state;
1637
1638         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1639         CTDB_NO_MEMORY_NULL(ctdb, pull);
1640
1641         pull->db_id   = dbid;
1642         pull->lmaster = lmaster;
1643
1644         indata.dsize = sizeof(struct ctdb_control_pulldb);
1645         indata.dptr  = (unsigned char *)pull;
1646
1647         state = ctdb_control_send(ctdb, destnode, 0, 
1648                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1649                                   mem_ctx, &timeout, NULL);
1650         talloc_free(pull);
1651
1652         return state;
1653 }
1654
1655 /*
1656   async recv for pull database
1657  */
1658 int ctdb_ctrl_pulldb_recv(
1659         struct ctdb_context *ctdb, 
1660         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1661         TDB_DATA *outdata)
1662 {
1663         int ret;
1664         int32_t res;
1665
1666         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1667         if ( (ret != 0) || (res != 0) ){
1668                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1669                 return -1;
1670         }
1671
1672         return 0;
1673 }
1674
1675 /*
1676   pull all keys and records for a specific database on a node
1677  */
1678 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1679                 uint32_t dbid, uint32_t lmaster, 
1680                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1681                 TDB_DATA *outdata)
1682 {
1683         struct ctdb_client_control_state *state;
1684
1685         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1686                                       timeout);
1687         
1688         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1689 }
1690
1691
1692 /*
1693   change dmaster for all keys in the database to the new value
1694  */
1695 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1696                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1697 {
1698         int ret;
1699         TDB_DATA indata;
1700         int32_t res;
1701
1702         indata.dsize = 2*sizeof(uint32_t);
1703         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1704
1705         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1706         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1707
1708         ret = ctdb_control(ctdb, destnode, 0, 
1709                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1710                            NULL, NULL, &res, &timeout, NULL);
1711         if (ret != 0 || res != 0) {
1712                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1713                 return -1;
1714         }
1715
1716         return 0;
1717 }
1718
1719 /*
1720   ping a node, return number of clients connected
1721  */
1722 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1723 {
1724         int ret;
1725         int32_t res;
1726
1727         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1728                            tdb_null, NULL, NULL, &res, NULL, NULL);
1729         if (ret != 0) {
1730                 return -1;
1731         }
1732         return res;
1733 }
1734
1735 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1736                            struct timeval timeout, 
1737                            uint32_t destnode,
1738                            uint32_t *runstate)
1739 {
1740         TDB_DATA outdata;
1741         int32_t res;
1742         int ret;
1743
1744         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1745                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1746         if (ret != 0 || res != 0) {
1747                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1748                 return ret != 0 ? ret : res;
1749         }
1750
1751         if (outdata.dsize != sizeof(uint32_t)) {
1752                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1753                 talloc_free(outdata.dptr);
1754                 return -1;
1755         }
1756
1757         if (runstate != NULL) {
1758                 *runstate = *(uint32_t *)outdata.dptr;
1759         }
1760         talloc_free(outdata.dptr);
1761
1762         return 0;
1763 }
1764
1765 /*
1766   find the real path to a ltdb 
1767  */
1768 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1769                    const char **path)
1770 {
1771         int ret;
1772         int32_t res;
1773         TDB_DATA data;
1774
1775         data.dptr = (uint8_t *)&dbid;
1776         data.dsize = sizeof(dbid);
1777
1778         ret = ctdb_control(ctdb, destnode, 0, 
1779                            CTDB_CONTROL_GETDBPATH, 0, data, 
1780                            mem_ctx, &data, &res, &timeout, NULL);
1781         if (ret != 0 || res != 0) {
1782                 return -1;
1783         }
1784
1785         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1786         if ((*path) == NULL) {
1787                 return -1;
1788         }
1789
1790         talloc_free(data.dptr);
1791
1792         return 0;
1793 }
1794
1795 /*
1796   find the name of a db 
1797  */
1798 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1799                    const char **name)
1800 {
1801         int ret;
1802         int32_t res;
1803         TDB_DATA data;
1804
1805         data.dptr = (uint8_t *)&dbid;
1806         data.dsize = sizeof(dbid);
1807
1808         ret = ctdb_control(ctdb, destnode, 0, 
1809                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1810                            mem_ctx, &data, &res, &timeout, NULL);
1811         if (ret != 0 || res != 0) {
1812                 return -1;
1813         }
1814
1815         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1816         if ((*name) == NULL) {
1817                 return -1;
1818         }
1819
1820         talloc_free(data.dptr);
1821
1822         return 0;
1823 }
1824
1825 /*
1826   get the health status of a db
1827  */
1828 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1829                           struct timeval timeout,
1830                           uint32_t destnode,
1831                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1832                           const char **reason)
1833 {
1834         int ret;
1835         int32_t res;
1836         TDB_DATA data;
1837
1838         data.dptr = (uint8_t *)&dbid;
1839         data.dsize = sizeof(dbid);
1840
1841         ret = ctdb_control(ctdb, destnode, 0,
1842                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1843                            mem_ctx, &data, &res, &timeout, NULL);
1844         if (ret != 0 || res != 0) {
1845                 return -1;
1846         }
1847
1848         if (data.dsize == 0) {
1849                 (*reason) = NULL;
1850                 return 0;
1851         }
1852
1853         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1854         if ((*reason) == NULL) {
1855                 return -1;
1856         }
1857
1858         talloc_free(data.dptr);
1859
1860         return 0;
1861 }
1862
1863 /*
1864  * get db sequence number
1865  */
1866 int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout,
1867                           uint32_t destnode, uint32_t dbid, uint64_t *seqnum)
1868 {
1869         int ret;
1870         int32_t res;
1871         TDB_DATA data, outdata;
1872
1873         data.dptr = (uint8_t *)&dbid;
1874         data.dsize = sizeof(uint64_t);  /* This is just wrong */
1875
1876         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_SEQNUM,
1877                            0, data, ctdb, &outdata, &res, &timeout, NULL);
1878         if (ret != 0 || res != 0) {
1879                 DEBUG(DEBUG_ERR,("ctdb_control for getdbesqnum failed\n"));
1880                 return -1;
1881         }
1882
1883         if (outdata.dsize != sizeof(uint64_t)) {
1884                 DEBUG(DEBUG_ERR,("Invalid return data in get_dbseqnum\n"));
1885                 talloc_free(outdata.dptr);
1886                 return -1;
1887         }
1888
1889         if (seqnum != NULL) {
1890                 *seqnum = *(uint64_t *)outdata.dptr;
1891         }
1892         talloc_free(outdata.dptr);
1893
1894         return 0;
1895 }
1896
1897 /*
1898   create a database
1899  */
1900 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1901                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1902 {
1903         int ret;
1904         int32_t res;
1905         TDB_DATA data;
1906         uint64_t tdb_flags = 0;
1907
1908         data.dptr = discard_const(name);
1909         data.dsize = strlen(name)+1;
1910
1911         /* Make sure that volatile databases use jenkins hash */
1912         if (!persistent) {
1913                 tdb_flags = TDB_INCOMPATIBLE_HASH;
1914         }
1915
1916         ret = ctdb_control(ctdb, destnode, tdb_flags,
1917                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1918                            0, data, 
1919                            mem_ctx, &data, &res, &timeout, NULL);
1920
1921         if (ret != 0 || res != 0) {
1922                 return -1;
1923         }
1924
1925         return 0;
1926 }
1927
1928 /*
1929   get debug level on a node
1930  */
1931 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1932 {
1933         int ret;
1934         int32_t res;
1935         TDB_DATA data;
1936
1937         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1938                            ctdb, &data, &res, NULL, NULL);
1939         if (ret != 0 || res != 0) {
1940                 return -1;
1941         }
1942         if (data.dsize != sizeof(int32_t)) {
1943                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1944                          (unsigned)data.dsize));
1945                 return -1;
1946         }
1947         *level = *(int32_t *)data.dptr;
1948         talloc_free(data.dptr);
1949         return 0;
1950 }
1951
1952 /*
1953   set debug level on a node
1954  */
1955 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1956 {
1957         int ret;
1958         int32_t res;
1959         TDB_DATA data;
1960
1961         data.dptr = (uint8_t *)&level;
1962         data.dsize = sizeof(level);
1963
1964         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1965                            NULL, NULL, &res, NULL, NULL);
1966         if (ret != 0 || res != 0) {
1967                 return -1;
1968         }
1969         return 0;
1970 }
1971
1972
1973 /*
1974   get a list of connected nodes
1975  */
1976 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1977                                 struct timeval timeout,
1978                                 TALLOC_CTX *mem_ctx,
1979                                 uint32_t *num_nodes)
1980 {
1981         struct ctdb_node_map *map=NULL;
1982         int ret, i;
1983         uint32_t *nodes;
1984
1985         *num_nodes = 0;
1986
1987         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1988         if (ret != 0) {
1989                 return NULL;
1990         }
1991
1992         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1993         if (nodes == NULL) {
1994                 return NULL;
1995         }
1996
1997         for (i=0;i<map->num;i++) {
1998                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1999                         nodes[*num_nodes] = map->nodes[i].pnn;
2000                         (*num_nodes)++;
2001                 }
2002         }
2003
2004         return nodes;
2005 }
2006
2007
2008 /*
2009   reset remote status
2010  */
2011 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
2012 {
2013         int ret;
2014         int32_t res;
2015
2016         ret = ctdb_control(ctdb, destnode, 0, 
2017                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
2018                            NULL, NULL, &res, NULL, NULL);
2019         if (ret != 0 || res != 0) {
2020                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
2021                 return -1;
2022         }
2023         return 0;
2024 }
2025
2026 /*
2027   attach to a specific database - client call
2028 */
2029 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
2030                                     struct timeval timeout,
2031                                     const char *name,
2032                                     bool persistent,
2033                                     uint32_t tdb_flags)
2034 {
2035         struct ctdb_db_context *ctdb_db;
2036         TDB_DATA data;
2037         int ret;
2038         int32_t res;
2039
2040         ctdb_db = ctdb_db_handle(ctdb, name);
2041         if (ctdb_db) {
2042                 return ctdb_db;
2043         }
2044
2045         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
2046         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
2047
2048         ctdb_db->ctdb = ctdb;
2049         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
2050         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
2051
2052         data.dptr = discard_const(name);
2053         data.dsize = strlen(name)+1;
2054
2055         /* CTDB has switched to using jenkins hash for volatile databases.
2056          * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH,
2057          * always set it.
2058          */
2059         if (!persistent) {
2060                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
2061         }
2062
2063         /* tell ctdb daemon to attach */
2064         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
2065                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
2066                            0, data, ctdb_db, &data, &res, NULL, NULL);
2067         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
2068                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
2069                 talloc_free(ctdb_db);
2070                 return NULL;
2071         }
2072         
2073         ctdb_db->db_id = *(uint32_t *)data.dptr;
2074         talloc_free(data.dptr);
2075
2076         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
2077         if (ret != 0) {
2078                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
2079                 talloc_free(ctdb_db);
2080                 return NULL;
2081         }
2082
2083         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
2084         if (ctdb->valgrinding) {
2085                 tdb_flags |= TDB_NOMMAP;
2086         }
2087         tdb_flags |= TDB_DISALLOW_NESTING;
2088
2089         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
2090         if (ctdb_db->ltdb == NULL) {
2091                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
2092                 talloc_free(ctdb_db);
2093                 return NULL;
2094         }
2095
2096         ctdb_db->persistent = persistent;
2097
2098         DLIST_ADD(ctdb->db_list, ctdb_db);
2099
2100         /* add well known functions */
2101         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2102         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2103         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2104
2105         return ctdb_db;
2106 }
2107
2108
2109 /*
2110   setup a call for a database
2111  */
2112 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2113 {
2114         struct ctdb_registered_call *call;
2115
2116 #if 0
2117         TDB_DATA data;
2118         int32_t status;
2119         struct ctdb_control_set_call c;
2120         int ret;
2121
2122         /* this is no longer valid with the separate daemon architecture */
2123         c.db_id = ctdb_db->db_id;
2124         c.fn    = fn;
2125         c.id    = id;
2126
2127         data.dptr = (uint8_t *)&c;
2128         data.dsize = sizeof(c);
2129
2130         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
2131                            data, NULL, NULL, &status, NULL, NULL);
2132         if (ret != 0 || status != 0) {
2133                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
2134                 return -1;
2135         }
2136 #endif
2137
2138         /* also register locally */
2139         call = talloc(ctdb_db, struct ctdb_registered_call);
2140         call->fn = fn;
2141         call->id = id;
2142
2143         DLIST_ADD(ctdb_db->calls, call);        
2144         return 0;
2145 }
2146
2147
2148 struct traverse_state {
2149         bool done;
2150         uint32_t count;
2151         ctdb_traverse_func fn;
2152         void *private_data;
2153         bool listemptyrecords;
2154 };
2155
2156 /*
2157   called on each key during a ctdb_traverse
2158  */
2159 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2160 {
2161         struct traverse_state *state = (struct traverse_state *)p;
2162         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2163         TDB_DATA key;
2164
2165         if (data.dsize < sizeof(uint32_t) ||
2166             d->length != data.dsize) {
2167                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2168                 state->done = true;
2169                 return;
2170         }
2171
2172         key.dsize = d->keylen;
2173         key.dptr  = &d->data[0];
2174         data.dsize = d->datalen;
2175         data.dptr = &d->data[d->keylen];
2176
2177         if (key.dsize == 0 && data.dsize == 0) {
2178                 /* end of traverse */
2179                 state->done = true;
2180                 return;
2181         }
2182
2183         if (!state->listemptyrecords &&
2184             data.dsize == sizeof(struct ctdb_ltdb_header))
2185         {
2186                 /* empty records are deleted records in ctdb */
2187                 return;
2188         }
2189
2190         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2191                 state->done = true;
2192         }
2193
2194         state->count++;
2195 }
2196
2197 /**
2198  * start a cluster wide traverse, calling the supplied fn on each record
2199  * return the number of records traversed, or -1 on error
2200  *
2201  * Extendet variant with a flag to signal whether empty records should
2202  * be listed.
2203  */
2204 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2205                              ctdb_traverse_func fn,
2206                              bool withemptyrecords,
2207                              void *private_data)
2208 {
2209         TDB_DATA data;
2210         struct ctdb_traverse_start_ext t;
2211         int32_t status;
2212         int ret;
2213         uint64_t srvid = (getpid() | 0xFLL<<60);
2214         struct traverse_state state;
2215
2216         state.done = false;
2217         state.count = 0;
2218         state.private_data = private_data;
2219         state.fn = fn;
2220         state.listemptyrecords = withemptyrecords;
2221
2222         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2223         if (ret != 0) {
2224                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2225                 return -1;
2226         }
2227
2228         t.db_id = ctdb_db->db_id;
2229         t.srvid = srvid;
2230         t.reqid = 0;
2231         t.withemptyrecords = withemptyrecords;
2232
2233         data.dptr = (uint8_t *)&t;
2234         data.dsize = sizeof(t);
2235
2236         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2237                            data, NULL, NULL, &status, NULL, NULL);
2238         if (ret != 0 || status != 0) {
2239                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2240                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2241                 return -1;
2242         }
2243
2244         while (!state.done) {
2245                 event_loop_once(ctdb_db->ctdb->ev);
2246         }
2247
2248         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2249         if (ret != 0) {
2250                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2251                 return -1;
2252         }
2253
2254         return state.count;
2255 }
2256
2257 /**
2258  * start a cluster wide traverse, calling the supplied fn on each record
2259  * return the number of records traversed, or -1 on error
2260  *
2261  * Standard version which does not list the empty records:
2262  * These are considered deleted.
2263  */
2264 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2265 {
2266         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2267 }
2268
2269 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2270 /*
2271   called on each key during a catdb
2272  */
2273 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2274 {
2275         int i;
2276         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2277         FILE *f = c->f;
2278         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2279
2280         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2281         for (i=0;i<key.dsize;i++) {
2282                 if (ISASCII(key.dptr[i])) {
2283                         fprintf(f, "%c", key.dptr[i]);
2284                 } else {
2285                         fprintf(f, "\\%02X", key.dptr[i]);
2286                 }
2287         }
2288         fprintf(f, "\"\n");
2289
2290         fprintf(f, "dmaster: %u\n", h->dmaster);
2291         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2292
2293         if (c->printlmaster && ctdb->vnn_map != NULL) {
2294                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
2295         }
2296
2297         if (c->printhash) {
2298                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2299         }
2300
2301         if (c->printrecordflags) {
2302                 fprintf(f, "flags: 0x%08x", h->flags);
2303                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2304                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2305                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2306                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2307                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2308                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2309                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2310                 fprintf(f, "\n");
2311         }
2312
2313         if (c->printdatasize) {
2314                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2315         } else {
2316                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2317                 for (i=sizeof(*h);i<data.dsize;i++) {
2318                         if (ISASCII(data.dptr[i])) {
2319                                 fprintf(f, "%c", data.dptr[i]);
2320                         } else {
2321                                 fprintf(f, "\\%02X", data.dptr[i]);
2322                         }
2323                 }
2324                 fprintf(f, "\"\n");
2325         }
2326
2327         fprintf(f, "\n");
2328
2329         return 0;
2330 }
2331
2332 /*
2333   convenience function to list all keys to stdout
2334  */
2335 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2336                  struct ctdb_dump_db_context *ctx)
2337 {
2338         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2339                                  ctx->printemptyrecords, ctx);
2340 }
2341
2342 /*
2343   get the pid of a ctdb daemon
2344  */
2345 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2346 {
2347         int ret;
2348         int32_t res;
2349
2350         ret = ctdb_control(ctdb, destnode, 0, 
2351                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2352                            NULL, NULL, &res, &timeout, NULL);
2353         if (ret != 0) {
2354                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2355                 return -1;
2356         }
2357
2358         *pid = res;
2359
2360         return 0;
2361 }
2362
2363
2364 /*
2365   async freeze send control
2366  */
2367 struct ctdb_client_control_state *
2368 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2369 {
2370         return ctdb_control_send(ctdb, destnode, priority, 
2371                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2372                            mem_ctx, &timeout, NULL);
2373 }
2374
2375 /* 
2376    async freeze recv control
2377 */
2378 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2379 {
2380         int ret;
2381         int32_t res;
2382
2383         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2384         if ( (ret != 0) || (res != 0) ){
2385                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2386                 return -1;
2387         }
2388
2389         return 0;
2390 }
2391
2392 /*
2393   freeze databases of a certain priority
2394  */
2395 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2396 {
2397         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2398         struct ctdb_client_control_state *state;
2399         int ret;
2400
2401         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2402         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2403         talloc_free(tmp_ctx);
2404
2405         return ret;
2406 }
2407
2408 /* Freeze all databases */
2409 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2410 {
2411         int i;
2412
2413         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2414                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2415                         return -1;
2416                 }
2417         }
2418         return 0;
2419 }
2420
2421 /*
2422   thaw databases of a certain priority
2423  */
2424 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2425 {
2426         int ret;
2427         int32_t res;
2428
2429         ret = ctdb_control(ctdb, destnode, priority, 
2430                            CTDB_CONTROL_THAW, 0, tdb_null, 
2431                            NULL, NULL, &res, &timeout, NULL);
2432         if (ret != 0 || res != 0) {
2433                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2434                 return -1;
2435         }
2436
2437         return 0;
2438 }
2439
2440 /* thaw all databases */
2441 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2442 {
2443         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2444 }
2445
2446 /*
2447   get pnn of a node, or -1
2448  */
2449 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2450 {
2451         int ret;
2452         int32_t res;
2453
2454         ret = ctdb_control(ctdb, destnode, 0, 
2455                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2456                            NULL, NULL, &res, &timeout, NULL);
2457         if (ret != 0) {
2458                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2459                 return -1;
2460         }
2461
2462         return res;
2463 }
2464
2465 /*
2466   get the monitoring mode of a remote node
2467  */
2468 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2469 {
2470         int ret;
2471         int32_t res;
2472
2473         ret = ctdb_control(ctdb, destnode, 0, 
2474                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2475                            NULL, NULL, &res, &timeout, NULL);
2476         if (ret != 0) {
2477                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2478                 return -1;
2479         }
2480
2481         *monmode = res;
2482
2483         return 0;
2484 }
2485
2486
2487 /*
2488  set the monitoring mode of a remote node to active
2489  */
2490 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2491 {
2492         int ret;
2493         
2494
2495         ret = ctdb_control(ctdb, destnode, 0, 
2496                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2497                            NULL, NULL,NULL, &timeout, NULL);
2498         if (ret != 0) {
2499                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2500                 return -1;
2501         }
2502
2503         
2504
2505         return 0;
2506 }
2507
2508 /*
2509   set the monitoring mode of a remote node to disable
2510  */
2511 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2512 {
2513         int ret;
2514         
2515
2516         ret = ctdb_control(ctdb, destnode, 0, 
2517                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2518                            NULL, NULL, NULL, &timeout, NULL);
2519         if (ret != 0) {
2520                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2521                 return -1;
2522         }
2523
2524         
2525
2526         return 0;
2527 }
2528
2529
2530
2531 /* 
2532   sent to a node to make it take over an ip address
2533 */
2534 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2535                           uint32_t destnode, struct ctdb_public_ip *ip)
2536 {
2537         TDB_DATA data;
2538         struct ctdb_public_ipv4 ipv4;
2539         int ret;
2540         int32_t res;
2541
2542         if (ip->addr.sa.sa_family == AF_INET) {
2543                 ipv4.pnn = ip->pnn;
2544                 ipv4.sin = ip->addr.ip;
2545
2546                 data.dsize = sizeof(ipv4);
2547                 data.dptr  = (uint8_t *)&ipv4;
2548
2549                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2550                            NULL, &res, &timeout, NULL);
2551         } else {
2552                 data.dsize = sizeof(*ip);
2553                 data.dptr  = (uint8_t *)ip;
2554
2555                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2556                            NULL, &res, &timeout, NULL);
2557         }
2558
2559         if (ret != 0 || res != 0) {
2560                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2561                 return -1;
2562         }
2563
2564         return 0;       
2565 }
2566
2567
2568 /* 
2569   sent to a node to make it release an ip address
2570 */
2571 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2572                          uint32_t destnode, struct ctdb_public_ip *ip)
2573 {
2574         TDB_DATA data;
2575         struct ctdb_public_ipv4 ipv4;
2576         int ret;
2577         int32_t res;
2578
2579         if (ip->addr.sa.sa_family == AF_INET) {
2580                 ipv4.pnn = ip->pnn;
2581                 ipv4.sin = ip->addr.ip;
2582
2583                 data.dsize = sizeof(ipv4);
2584                 data.dptr  = (uint8_t *)&ipv4;
2585
2586                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2587                                    NULL, &res, &timeout, NULL);
2588         } else {
2589                 data.dsize = sizeof(*ip);
2590                 data.dptr  = (uint8_t *)ip;
2591
2592                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2593                                    NULL, &res, &timeout, NULL);
2594         }
2595
2596         if (ret != 0 || res != 0) {
2597                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2598                 return -1;
2599         }
2600
2601         return 0;       
2602 }
2603
2604
2605 /*
2606   get a tunable
2607  */
2608 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2609                           struct timeval timeout, 
2610                           uint32_t destnode,
2611                           const char *name, uint32_t *value)
2612 {
2613         struct ctdb_control_get_tunable *t;
2614         TDB_DATA data, outdata;
2615         int32_t res;
2616         int ret;
2617
2618         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2619         data.dptr  = talloc_size(ctdb, data.dsize);
2620         CTDB_NO_MEMORY(ctdb, data.dptr);
2621
2622         t = (struct ctdb_control_get_tunable *)data.dptr;
2623         t->length = strlen(name)+1;
2624         memcpy(t->name, name, t->length);
2625
2626         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2627                            &outdata, &res, &timeout, NULL);
2628         talloc_free(data.dptr);
2629         if (ret != 0 || res != 0) {
2630                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2631                 return ret != 0 ? ret : res;
2632         }
2633
2634         if (outdata.dsize != sizeof(uint32_t)) {
2635                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2636                 talloc_free(outdata.dptr);
2637                 return -1;
2638         }
2639         
2640         *value = *(uint32_t *)outdata.dptr;
2641         talloc_free(outdata.dptr);
2642
2643         return 0;
2644 }
2645
2646 /*
2647   set a tunable
2648  */
2649 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2650                           struct timeval timeout, 
2651                           uint32_t destnode,
2652                           const char *name, uint32_t value)
2653 {
2654         struct ctdb_control_set_tunable *t;
2655         TDB_DATA data;
2656         int32_t res;
2657         int ret;
2658
2659         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2660         data.dptr  = talloc_size(ctdb, data.dsize);
2661         CTDB_NO_MEMORY(ctdb, data.dptr);
2662
2663         t = (struct ctdb_control_set_tunable *)data.dptr;
2664         t->length = strlen(name)+1;
2665         memcpy(t->name, name, t->length);
2666         t->value = value;
2667
2668         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2669                            NULL, &res, &timeout, NULL);
2670         talloc_free(data.dptr);
2671         if (ret != 0 || res != 0) {
2672                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2673                 return -1;
2674         }
2675
2676         return 0;
2677 }
2678
2679 /*
2680   list tunables
2681  */
2682 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2683                             struct timeval timeout, 
2684                             uint32_t destnode,
2685                             TALLOC_CTX *mem_ctx,
2686                             const char ***list, uint32_t *count)
2687 {
2688         TDB_DATA outdata;
2689         int32_t res;
2690         int ret;
2691         struct ctdb_control_list_tunable *t;
2692         char *p, *s, *ptr;
2693
2694         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2695                            mem_ctx, &outdata, &res, &timeout, NULL);
2696         if (ret != 0 || res != 0) {
2697                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2698                 return -1;
2699         }
2700
2701         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2702         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2703             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2704                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2705                 talloc_free(outdata.dptr);
2706                 return -1;              
2707         }
2708         
2709         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2710         CTDB_NO_MEMORY(ctdb, p);
2711
2712         talloc_free(outdata.dptr);
2713         
2714         (*list) = NULL;
2715         (*count) = 0;
2716
2717         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2718                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2719                 CTDB_NO_MEMORY(ctdb, *list);
2720                 (*list)[*count] = talloc_strdup(*list, s);
2721                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2722                 (*count)++;
2723         }
2724
2725         talloc_free(p);
2726
2727         return 0;
2728 }
2729
2730
2731 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2732                                    struct timeval timeout, uint32_t destnode,
2733                                    TALLOC_CTX *mem_ctx,
2734                                    uint32_t flags,
2735                                    struct ctdb_all_public_ips **ips)
2736 {
2737         int ret;
2738         TDB_DATA outdata;
2739         int32_t res;
2740
2741         ret = ctdb_control(ctdb, destnode, 0, 
2742                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2743                            mem_ctx, &outdata, &res, &timeout, NULL);
2744         if (ret == 0 && res == -1) {
2745                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2746                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2747         }
2748         if (ret != 0 || res != 0) {
2749           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2750                 return -1;
2751         }
2752
2753         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2754         talloc_free(outdata.dptr);
2755                     
2756         return 0;
2757 }
2758
2759 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2760                              struct timeval timeout, uint32_t destnode,
2761                              TALLOC_CTX *mem_ctx,
2762                              struct ctdb_all_public_ips **ips)
2763 {
2764         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2765                                               destnode, mem_ctx,
2766                                               0, ips);
2767 }
2768
2769 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2770                         struct timeval timeout, uint32_t destnode, 
2771                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2772 {
2773         int ret, i, len;
2774         TDB_DATA outdata;
2775         int32_t res;
2776         struct ctdb_all_public_ipsv4 *ipsv4;
2777
2778         ret = ctdb_control(ctdb, destnode, 0, 
2779                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2780                            mem_ctx, &outdata, &res, &timeout, NULL);
2781         if (ret != 0 || res != 0) {
2782                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2783                 return -1;
2784         }
2785
2786         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2787         len = offsetof(struct ctdb_all_public_ips, ips) +
2788                 ipsv4->num*sizeof(struct ctdb_public_ip);
2789         *ips = talloc_zero_size(mem_ctx, len);
2790         CTDB_NO_MEMORY(ctdb, *ips);
2791         (*ips)->num = ipsv4->num;
2792         for (i=0; i<ipsv4->num; i++) {
2793                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2794                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2795         }
2796
2797         talloc_free(outdata.dptr);
2798                     
2799         return 0;
2800 }
2801
2802 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2803                                  struct timeval timeout, uint32_t destnode,
2804                                  TALLOC_CTX *mem_ctx,
2805                                  const ctdb_sock_addr *addr,
2806                                  struct ctdb_control_public_ip_info **_info)
2807 {
2808         int ret;
2809         TDB_DATA indata;
2810         TDB_DATA outdata;
2811         int32_t res;
2812         struct ctdb_control_public_ip_info *info;
2813         uint32_t len;
2814         uint32_t i;
2815
2816         indata.dptr = discard_const_p(uint8_t, addr);
2817         indata.dsize = sizeof(*addr);
2818
2819         ret = ctdb_control(ctdb, destnode, 0,
2820                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2821                            mem_ctx, &outdata, &res, &timeout, NULL);
2822         if (ret != 0 || res != 0) {
2823                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2824                                 "failed ret:%d res:%d\n",
2825                                 ret, res));
2826                 return -1;
2827         }
2828
2829         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2830         if (len > outdata.dsize) {
2831                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2832                                 "returned invalid data with size %u > %u\n",
2833                                 (unsigned int)outdata.dsize,
2834                                 (unsigned int)len));
2835                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2836                 return -1;
2837         }
2838
2839         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2840         len += info->num*sizeof(struct ctdb_control_iface_info);
2841
2842         if (len > outdata.dsize) {
2843                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2844                                 "returned invalid data with size %u > %u\n",
2845                                 (unsigned int)outdata.dsize,
2846                                 (unsigned int)len));
2847                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2848                 return -1;
2849         }
2850
2851         /* make sure we null terminate the returned strings */
2852         for (i=0; i < info->num; i++) {
2853                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2854         }
2855
2856         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2857                                                                 outdata.dptr,
2858                                                                 outdata.dsize);
2859         talloc_free(outdata.dptr);
2860         if (*_info == NULL) {
2861                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2862                                 "talloc_memdup size %u failed\n",
2863                                 (unsigned int)outdata.dsize));
2864                 return -1;
2865         }
2866
2867         return 0;
2868 }
2869
2870 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2871                          struct timeval timeout, uint32_t destnode,
2872                          TALLOC_CTX *mem_ctx,
2873                          struct ctdb_control_get_ifaces **_ifaces)
2874 {
2875         int ret;
2876         TDB_DATA outdata;
2877         int32_t res;
2878         struct ctdb_control_get_ifaces *ifaces;
2879         uint32_t len;
2880         uint32_t i;
2881
2882         ret = ctdb_control(ctdb, destnode, 0,
2883                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2884                            mem_ctx, &outdata, &res, &timeout, NULL);
2885         if (ret != 0 || res != 0) {
2886                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2887                                 "failed ret:%d res:%d\n",
2888                                 ret, res));
2889                 return -1;
2890         }
2891
2892         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2893         if (len > outdata.dsize) {
2894                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2895                                 "returned invalid data with size %u > %u\n",
2896                                 (unsigned int)outdata.dsize,
2897                                 (unsigned int)len));
2898                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2899                 return -1;
2900         }
2901
2902         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2903         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2904
2905         if (len > outdata.dsize) {
2906                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2907                                 "returned invalid data with size %u > %u\n",
2908                                 (unsigned int)outdata.dsize,
2909                                 (unsigned int)len));
2910                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2911                 return -1;
2912         }
2913
2914         /* make sure we null terminate the returned strings */
2915         for (i=0; i < ifaces->num; i++) {
2916                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2917         }
2918
2919         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2920                                                                   outdata.dptr,
2921                                                                   outdata.dsize);
2922         talloc_free(outdata.dptr);
2923         if (*_ifaces == NULL) {
2924                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2925                                 "talloc_memdup size %u failed\n",
2926                                 (unsigned int)outdata.dsize));
2927                 return -1;
2928         }
2929
2930         return 0;
2931 }
2932
2933 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2934                              struct timeval timeout, uint32_t destnode,
2935                              TALLOC_CTX *mem_ctx,
2936                              const struct ctdb_control_iface_info *info)
2937 {
2938         int ret;
2939         TDB_DATA indata;
2940         int32_t res;
2941
2942         indata.dptr = discard_const_p(uint8_t, info);
2943         indata.dsize = sizeof(*info);
2944
2945         ret = ctdb_control(ctdb, destnode, 0,
2946                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2947                            mem_ctx, NULL, &res, &timeout, NULL);
2948         if (ret != 0 || res != 0) {
2949                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2950                                 "failed ret:%d res:%d\n",
2951                                 ret, res));
2952                 return -1;
2953         }
2954
2955         return 0;
2956 }
2957
2958 /*
2959   set/clear the permanent disabled bit on a remote node
2960  */
2961 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2962                        uint32_t set, uint32_t clear)
2963 {
2964         int ret;
2965         TDB_DATA data;
2966         struct ctdb_node_map *nodemap=NULL;
2967         struct ctdb_node_flag_change c;
2968         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2969         uint32_t recmaster;
2970         uint32_t *nodes;
2971
2972
2973         /* find the recovery master */
2974         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2975         if (ret != 0) {
2976                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2977                 talloc_free(tmp_ctx);
2978                 return ret;
2979         }
2980
2981
2982         /* read the node flags from the recmaster */
2983         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2984         if (ret != 0) {
2985                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2986                 talloc_free(tmp_ctx);
2987                 return -1;
2988         }
2989         if (destnode >= nodemap->num) {
2990                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2991                 talloc_free(tmp_ctx);
2992                 return -1;
2993         }
2994
2995         c.pnn       = destnode;
2996         c.old_flags = nodemap->nodes[destnode].flags;
2997         c.new_flags = c.old_flags;
2998         c.new_flags |= set;
2999         c.new_flags &= ~clear;
3000
3001         data.dsize = sizeof(c);
3002         data.dptr = (unsigned char *)&c;
3003
3004         /* send the flags update to all connected nodes */
3005         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
3006
3007         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
3008                                         nodes, 0,
3009                                         timeout, false, data,
3010                                         NULL, NULL,
3011                                         NULL) != 0) {
3012                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
3013
3014                 talloc_free(tmp_ctx);
3015                 return -1;
3016         }
3017
3018         talloc_free(tmp_ctx);
3019         return 0;
3020 }
3021
3022
3023 /*
3024   get all tunables
3025  */
3026 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
3027                                struct timeval timeout, 
3028                                uint32_t destnode,
3029                                struct ctdb_tunable *tunables)
3030 {
3031         TDB_DATA outdata;
3032         int ret;
3033         int32_t res;
3034
3035         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
3036                            &outdata, &res, &timeout, NULL);
3037         if (ret != 0 || res != 0) {
3038                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
3039                 return -1;
3040         }
3041
3042         if (outdata.dsize != sizeof(*tunables)) {
3043                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
3044                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
3045                 return -1;              
3046         }
3047
3048         *tunables = *(struct ctdb_tunable *)outdata.dptr;
3049         talloc_free(outdata.dptr);
3050         return 0;
3051 }
3052
3053 /*
3054   add a public address to a node
3055  */
3056 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
3057                       struct timeval timeout, 
3058                       uint32_t destnode,
3059                       struct ctdb_control_ip_iface *pub)
3060 {
3061         TDB_DATA data;
3062         int32_t res;
3063         int ret;
3064
3065         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3066         data.dptr  = (unsigned char *)pub;
3067
3068         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
3069                            NULL, &res, &timeout, NULL);
3070         if (ret != 0 || res != 0) {
3071                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
3072                 return -1;
3073         }
3074
3075         return 0;
3076 }
3077
3078 /*
3079   delete a public address from a node
3080  */
3081 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
3082                       struct timeval timeout, 
3083                       uint32_t destnode,
3084                       struct ctdb_control_ip_iface *pub)
3085 {
3086         TDB_DATA data;
3087         int32_t res;
3088         int ret;
3089
3090         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3091         data.dptr  = (unsigned char *)pub;
3092
3093         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
3094                            NULL, &res, &timeout, NULL);
3095         if (ret != 0 || res != 0) {
3096                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
3097                 return -1;
3098         }
3099
3100         return 0;
3101 }
3102
3103 /*
3104   kill a tcp connection
3105  */
3106 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
3107                       struct timeval timeout, 
3108                       uint32_t destnode,
3109                       struct ctdb_control_killtcp *killtcp)
3110 {
3111         TDB_DATA data;
3112         int32_t res;
3113         int ret;
3114
3115         data.dsize = sizeof(struct ctdb_control_killtcp);
3116         data.dptr  = (unsigned char *)killtcp;
3117
3118         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
3119                            NULL, &res, &timeout, NULL);
3120         if (ret != 0 || res != 0) {
3121                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
3122                 return -1;
3123         }
3124
3125         return 0;
3126 }
3127
3128 /*
3129   send a gratious arp
3130  */
3131 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3132                       struct timeval timeout, 
3133                       uint32_t destnode,
3134                       ctdb_sock_addr *addr,
3135                       const char *ifname)
3136 {
3137         TDB_DATA data;
3138         int32_t res;
3139         int ret, len;
3140         struct ctdb_control_gratious_arp *gratious_arp;
3141         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3142
3143
3144         len = strlen(ifname)+1;
3145         gratious_arp = talloc_size(tmp_ctx, 
3146                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3147         CTDB_NO_MEMORY(ctdb, gratious_arp);
3148
3149         gratious_arp->addr = *addr;
3150         gratious_arp->len = len;
3151         memcpy(&gratious_arp->iface[0], ifname, len);
3152
3153
3154         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3155         data.dptr  = (unsigned char *)gratious_arp;
3156
3157         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3158                            NULL, &res, &timeout, NULL);
3159         if (ret != 0 || res != 0) {
3160                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3161                 talloc_free(tmp_ctx);
3162                 return -1;
3163         }
3164
3165         talloc_free(tmp_ctx);
3166         return 0;
3167 }
3168
3169 /*
3170   get a list of all tcp tickles that a node knows about for a particular vnn
3171  */
3172 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3173                               struct timeval timeout, uint32_t destnode, 
3174                               TALLOC_CTX *mem_ctx, 
3175                               ctdb_sock_addr *addr,
3176                               struct ctdb_control_tcp_tickle_list **list)
3177 {
3178         int ret;
3179         TDB_DATA data, outdata;
3180         int32_t status;
3181
3182         data.dptr = (uint8_t*)addr;
3183         data.dsize = sizeof(ctdb_sock_addr);
3184
3185         ret = ctdb_control(ctdb, destnode, 0, 
3186                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3187                            mem_ctx, &outdata, &status, NULL, NULL);
3188         if (ret != 0 || status != 0) {
3189                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3190                 return -1;
3191         }
3192
3193         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3194
3195         return status;
3196 }
3197
3198 /*
3199   register a server id
3200  */
3201 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3202                       struct timeval timeout, 
3203                       struct ctdb_server_id *id)
3204 {
3205         TDB_DATA data;
3206         int32_t res;
3207         int ret;
3208
3209         data.dsize = sizeof(struct ctdb_server_id);
3210         data.dptr  = (unsigned char *)id;
3211
3212         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3213                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3214                         0, data, NULL,
3215                         NULL, &res, &timeout, NULL);
3216         if (ret != 0 || res != 0) {
3217                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3218                 return -1;
3219         }
3220
3221         return 0;
3222 }
3223
3224 /*
3225   unregister a server id
3226  */
3227 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3228                       struct timeval timeout, 
3229                       struct ctdb_server_id *id)
3230 {
3231         TDB_DATA data;
3232         int32_t res;
3233         int ret;
3234
3235         data.dsize = sizeof(struct ctdb_server_id);
3236         data.dptr  = (unsigned char *)id;
3237
3238         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3239                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3240                         0, data, NULL,
3241                         NULL, &res, &timeout, NULL);
3242         if (ret != 0 || res != 0) {
3243                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3244                 return -1;
3245         }
3246
3247         return 0;
3248 }
3249
3250
3251 /*
3252   check if a server id exists
3253
3254   if a server id does exist, return *status == 1, otherwise *status == 0
3255  */
3256 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3257                       struct timeval timeout, 
3258                       uint32_t destnode,
3259                       struct ctdb_server_id *id,
3260                       uint32_t *status)
3261 {
3262         TDB_DATA data;
3263         int32_t res;
3264         int ret;
3265
3266         data.dsize = sizeof(struct ctdb_server_id);
3267         data.dptr  = (unsigned char *)id;
3268
3269         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3270                         0, data, NULL,
3271                         NULL, &res, &timeout, NULL);
3272         if (ret != 0) {
3273                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3274                 return -1;
3275         }
3276
3277         if (res) {
3278                 *status = 1;
3279         } else {
3280                 *status = 0;
3281         }
3282
3283         return 0;
3284 }
3285
3286 /*
3287    get the list of server ids that are registered on a node
3288 */
3289 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3290                 TALLOC_CTX *mem_ctx,
3291                 struct timeval timeout, uint32_t destnode, 
3292                 struct ctdb_server_id_list **svid_list)
3293 {
3294         int ret;
3295         TDB_DATA outdata;
3296         int32_t res;
3297
3298         ret = ctdb_control(ctdb, destnode, 0, 
3299                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3300                            mem_ctx, &outdata, &res, &timeout, NULL);
3301         if (ret != 0 || res != 0) {
3302                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3303                 return -1;
3304         }
3305
3306         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3307                     
3308         return 0;
3309 }
3310
3311 /*
3312   initialise the ctdb daemon for client applications
3313
3314   NOTE: In current code the daemon does not fork. This is for testing purposes only
3315   and to simplify the code.
3316 */
3317 struct ctdb_context *ctdb_init(struct event_context *ev)
3318 {
3319         int ret;
3320         struct ctdb_context *ctdb;
3321
3322         ctdb = talloc_zero(ev, struct ctdb_context);
3323         if (ctdb == NULL) {
3324                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3325                 return NULL;
3326         }
3327         ctdb->ev  = ev;
3328         ctdb->idr = idr_init(ctdb);
3329         /* Wrap early to exercise code. */
3330         ctdb->lastid = INT_MAX-200;
3331         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3332
3333         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3334         if (ret != 0) {
3335                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3336                 talloc_free(ctdb);
3337                 return NULL;
3338         }
3339
3340         ctdb->statistics.statistics_start_time = timeval_current();
3341
3342         return ctdb;
3343 }
3344
3345
3346 /*
3347   set some ctdb flags
3348 */
3349 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3350 {
3351         ctdb->flags |= flags;
3352 }
3353
3354 /*
3355   setup the local socket name
3356 */
3357 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3358 {
3359         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3360         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3361
3362         return 0;
3363 }
3364
3365 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3366 {
3367         return ctdb->daemon.name;
3368 }
3369
3370 /*
3371   return the pnn of this node
3372 */
3373 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3374 {
3375         return ctdb->pnn;
3376 }
3377
3378
3379 /*
3380   get the uptime of a remote node
3381  */
3382 struct ctdb_client_control_state *
3383 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3384 {
3385         return ctdb_control_send(ctdb, destnode, 0, 
3386                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3387                            mem_ctx, &timeout, NULL);
3388 }
3389
3390 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3391 {
3392         int ret;
3393         int32_t res;
3394         TDB_DATA outdata;
3395
3396         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3397         if (ret != 0 || res != 0) {
3398                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3399                 return -1;
3400         }
3401
3402         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3403
3404         return 0;
3405 }
3406
3407 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3408 {
3409         struct ctdb_client_control_state *state;
3410
3411         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3412         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3413 }
3414
3415 /*
3416   send a control to execute the "recovered" event script on a node
3417  */
3418 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3419 {
3420         int ret;
3421         int32_t status;
3422
3423         ret = ctdb_control(ctdb, destnode, 0, 
3424                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3425                            NULL, NULL, &status, &timeout, NULL);
3426         if (ret != 0 || status != 0) {
3427                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3428                 return -1;
3429         }
3430
3431         return 0;
3432 }
3433
3434 /* 
3435   callback for the async helpers used when sending the same control
3436   to multiple nodes in parallell.
3437 */
3438 static void async_callback(struct ctdb_client_control_state *state)
3439 {
3440         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3441         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3442         int ret;
3443         TDB_DATA outdata;
3444         int32_t res = -1;
3445         uint32_t destnode = state->c->hdr.destnode;
3446
3447         /* one more node has responded with recmode data */
3448         data->count--;
3449
3450         /* if we failed to push the db, then return an error and let
3451            the main loop try again.
3452         */
3453         if (state->state != CTDB_CONTROL_DONE) {
3454                 if ( !data->dont_log_errors) {
3455                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3456                 }
3457                 data->fail_count++;
3458                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3459                         res = -ETIME;
3460                 } else {
3461                         res = -1;
3462                 }
3463                 if (data->fail_callback) {
3464                         data->fail_callback(ctdb, destnode, res, outdata,
3465                                         data->callback_data);
3466                 }
3467                 return;
3468         }
3469         
3470         state->async.fn = NULL;
3471
3472         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3473         if ((ret != 0) || (res != 0)) {
3474                 if ( !data->dont_log_errors) {
3475                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3476                 }
3477                 data->fail_count++;
3478                 if (data->fail_callback) {
3479                         data->fail_callback(ctdb, destnode, res, outdata,
3480                                         data->callback_data);
3481                 }
3482         }
3483         if ((ret == 0) && (data->callback != NULL)) {
3484                 data->callback(ctdb, destnode, res, outdata,
3485                                         data->callback_data);
3486         }
3487 }
3488
3489
3490 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3491 {
3492         /* set up the callback functions */
3493         state->async.fn = async_callback;
3494         state->async.private_data = data;
3495         
3496         /* one more control to wait for to complete */
3497         data->count++;
3498 }
3499
3500
3501 /* wait for up to the maximum number of seconds allowed
3502    or until all nodes we expect a response from has replied
3503 */
3504 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3505 {
3506         while (data->count > 0) {
3507                 event_loop_once(ctdb->ev);
3508         }
3509         if (data->fail_count != 0) {
3510                 if (!data->dont_log_errors) {
3511                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3512                                  data->fail_count));
3513                 }
3514                 return -1;
3515         }
3516         return 0;
3517 }
3518
3519
3520 /* 
3521    perform a simple control on the listed nodes
3522    The control cannot return data
3523  */
3524 int ctdb_client_async_control(struct ctdb_context *ctdb,
3525                                 enum ctdb_controls opcode,
3526                                 uint32_t *nodes,
3527                                 uint64_t srvid,
3528                                 struct timeval timeout,
3529                                 bool dont_log_errors,
3530                                 TDB_DATA data,
3531                                 client_async_callback client_callback,
3532                                 client_async_callback fail_callback,
3533                                 void *callback_data)
3534 {
3535         struct client_async_data *async_data;
3536         struct ctdb_client_control_state *state;
3537         int j, num_nodes;
3538
3539         async_data = talloc_zero(ctdb, struct client_async_data);
3540         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3541         async_data->dont_log_errors = dont_log_errors;
3542         async_data->callback = client_callback;
3543         async_data->fail_callback = fail_callback;
3544         async_data->callback_data = callback_data;
3545         async_data->opcode        = opcode;
3546
3547         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3548
3549         /* loop over all nodes and send an async control to each of them */
3550         for (j=0; j<num_nodes; j++) {
3551                 uint32_t pnn = nodes[j];
3552
3553                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3554                                           0, data, async_data, &timeout, NULL);
3555                 if (state == NULL) {
3556                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3557                         talloc_free(async_data);
3558                         return -1;
3559                 }
3560                 
3561                 ctdb_client_async_add(async_data, state);
3562         }
3563
3564         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3565                 talloc_free(async_data);
3566                 return -1;
3567         }
3568
3569         talloc_free(async_data);
3570         return 0;
3571 }
3572
3573 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3574                                 struct ctdb_vnn_map *vnn_map,
3575                                 TALLOC_CTX *mem_ctx,
3576                                 bool include_self)
3577 {
3578         int i, j, num_nodes;
3579         uint32_t *nodes;
3580
3581         for (i=num_nodes=0;i<vnn_map->size;i++) {
3582                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3583                         continue;
3584                 }
3585                 num_nodes++;
3586         } 
3587
3588         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3589         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3590
3591         for (i=j=0;i<vnn_map->size;i++) {
3592                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3593                         continue;
3594                 }
3595                 nodes[j++] = vnn_map->map[i];
3596         } 
3597
3598         return nodes;
3599 }
3600
3601 /* Get list of nodes not including those with flags specified by mask.
3602  * If exclude_pnn is not -1 then exclude that pnn from the list.
3603  */
3604 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3605                         struct ctdb_node_map *node_map,
3606                         TALLOC_CTX *mem_ctx,
3607                         uint32_t mask,
3608                         int exclude_pnn)
3609 {
3610         int i, j, num_nodes;
3611         uint32_t *nodes;
3612
3613         for (i=num_nodes=0;i<node_map->num;i++) {
3614                 if (node_map->nodes[i].flags & mask) {
3615                         continue;
3616                 }
3617                 if (node_map->nodes[i].pnn == exclude_pnn) {
3618                         continue;
3619                 }
3620                 num_nodes++;
3621         } 
3622
3623         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3624         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3625
3626         for (i=j=0;i<node_map->num;i++) {
3627                 if (node_map->nodes[i].flags & mask) {
3628                         continue;
3629                 }
3630                 if (node_map->nodes[i].pnn == exclude_pnn) {
3631                         continue;
3632                 }
3633                 nodes[j++] = node_map->nodes[i].pnn;
3634         } 
3635
3636         return nodes;
3637 }
3638
3639 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3640                                 struct ctdb_node_map *node_map,
3641                                 TALLOC_CTX *mem_ctx,
3642                                 bool include_self)
3643 {
3644         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3645                              include_self ? -1 : ctdb->pnn);
3646 }
3647
3648 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3649                                 struct ctdb_node_map *node_map,
3650                                 TALLOC_CTX *mem_ctx,
3651                                 bool include_self)
3652 {
3653         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3654                              include_self ? -1 : ctdb->pnn);
3655 }
3656
3657 /* 
3658   this is used to test if a pnn lock exists and if it exists will return
3659   the number of connections that pnn has reported or -1 if that recovery
3660   daemon is not running.
3661 */
3662 int
3663 ctdb_read_pnn_lock(int fd, int32_t pnn)
3664 {
3665         struct flock lock;
3666         char c;
3667
3668         lock.l_type = F_WRLCK;
3669         lock.l_whence = SEEK_SET;
3670         lock.l_start = pnn;
3671         lock.l_len = 1;
3672         lock.l_pid = 0;
3673
3674         if (fcntl(fd, F_GETLK, &lock) != 0) {
3675                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3676                 return -1;
3677         }
3678
3679         if (lock.l_type == F_UNLCK) {
3680                 return -1;
3681         }
3682
3683         if (pread(fd, &c, 1, pnn) == -1) {
3684                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3685                 return -1;
3686         }
3687
3688         return c;
3689 }
3690
3691 /*
3692   get capabilities of a remote node
3693  */
3694 struct ctdb_client_control_state *
3695 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3696 {
3697         return ctdb_control_send(ctdb, destnode, 0, 
3698                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3699                            mem_ctx, &timeout, NULL);
3700 }
3701
3702 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3703 {
3704         int ret;
3705         int32_t res;
3706         TDB_DATA outdata;
3707
3708         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3709         if ( (ret != 0) || (res != 0) ) {
3710                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3711                 return -1;
3712         }
3713
3714         if (capabilities) {
3715                 *capabilities = *((uint32_t *)outdata.dptr);
3716         }
3717
3718         return 0;
3719 }
3720
3721 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3722 {
3723         struct ctdb_client_control_state *state;
3724         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3725         int ret;
3726
3727         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3728         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3729         talloc_free(tmp_ctx);
3730         return ret;
3731 }
3732
3733 struct server_id {
3734         uint64_t pid;
3735         uint32_t task_id;
3736         uint32_t vnn;
3737         uint64_t unique_id;
3738 };
3739
3740 static struct server_id server_id_get(struct ctdb_context *ctdb, uint32_t reqid)
3741 {
3742         struct server_id id;
3743
3744         id.pid = getpid();
3745         id.task_id = reqid;
3746         id.vnn = ctdb_get_pnn(ctdb);
3747         id.unique_id = id.vnn;
3748         id.unique_id = (id.unique_id << 32) | reqid;
3749
3750         return id;
3751 }
3752
3753 static bool server_id_equal(struct server_id *id1, struct server_id *id2)
3754 {
3755         if (id1->pid != id2->pid) {
3756                 return false;
3757         }
3758
3759         if (id1->task_id != id2->task_id) {
3760                 return false;
3761         }
3762
3763         if (id1->vnn != id2->vnn) {
3764                 return false;
3765         }
3766
3767         if (id1->unique_id != id2->unique_id) {
3768                 return false;
3769         }
3770
3771         return true;
3772 }
3773
3774 static bool server_id_exists(struct ctdb_context *ctdb, struct server_id *id)
3775 {
3776         struct ctdb_server_id sid;
3777         int ret;
3778         uint32_t result;
3779
3780         sid.type = SERVER_TYPE_SAMBA;
3781         sid.pnn = id->vnn;
3782         sid.server_id = id->pid;
3783
3784         ret = ctdb_ctrl_check_server_id(ctdb, timeval_current_ofs(3,0),
3785                                         id->vnn, &sid, &result);
3786         if (ret != 0) {
3787                 /* If control times out, assume server_id exists. */
3788                 return true;
3789         }
3790
3791         if (result) {
3792                 return true;
3793         }
3794
3795         return false;
3796 }
3797
3798
3799 enum g_lock_type {
3800         G_LOCK_READ = 0,
3801         G_LOCK_WRITE = 1,
3802 };
3803
3804 struct g_lock_rec {
3805         enum g_lock_type type;
3806         struct server_id id;
3807 };
3808
3809 struct g_lock_recs {
3810         unsigned int num;
3811         struct g_lock_rec *lock;
3812 };
3813
3814 static bool g_lock_parse(TALLOC_CTX *mem_ctx, TDB_DATA data,
3815                          struct g_lock_recs **locks)
3816 {
3817         struct g_lock_recs *recs;
3818
3819         recs = talloc_zero(mem_ctx, struct g_lock_recs);
3820         if (recs == NULL) {
3821                 return false;
3822         }
3823
3824         if (data.dsize == 0) {
3825                 goto done;
3826         }
3827
3828         if (data.dsize % sizeof(struct g_lock_rec) != 0) {
3829                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %lu in g_lock record\n",
3830                                   (unsigned long)data.dsize));
3831                 talloc_free(recs);
3832                 return false;
3833         }
3834
3835         recs->num = data.dsize / sizeof(struct g_lock_rec);
3836         recs->lock = talloc_memdup(mem_ctx, data.dptr, data.dsize);
3837         if (recs->lock == NULL) {
3838                 talloc_free(recs);
3839                 return false;
3840         }
3841
3842 done:
3843         if (locks != NULL) {
3844                 *locks = recs;
3845         }
3846
3847         return true;
3848 }
3849
3850
3851 static bool g_lock_lock(TALLOC_CTX *mem_ctx,
3852                         struct ctdb_db_context *ctdb_db,
3853                         const char *keyname, uint32_t reqid)
3854 {
3855         TDB_DATA key, data;
3856         struct ctdb_record_handle *h;
3857         struct g_lock_recs *locks;
3858         struct server_id id;
3859         struct timeval t_start;
3860         int i;
3861
3862         key.dptr = (uint8_t *)discard_const(keyname);
3863         key.dsize = strlen(keyname) + 1;
3864
3865         t_start = timeval_current();
3866
3867 again:
3868         /* Keep trying for an hour. */
3869         if (timeval_elapsed(&t_start) > 3600) {
3870                 return false;
3871         }
3872
3873         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3874         if (h == NULL) {
3875                 return false;
3876         }
3877
3878         if (!g_lock_parse(h, data, &locks)) {
3879                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3880                 talloc_free(data.dptr);
3881                 talloc_free(h);
3882                 return false;
3883         }
3884
3885         talloc_free(data.dptr);
3886
3887         id = server_id_get(ctdb_db->ctdb, reqid);
3888
3889         i = 0;
3890         while (i < locks->num) {
3891                 if (server_id_equal(&locks->lock[i].id, &id)) {
3892                         /* Internal error */
3893                         talloc_free(h);
3894                         return false;
3895                 }
3896
3897                 if (!server_id_exists(ctdb_db->ctdb, &locks->lock[i].id)) {
3898                         if (i < locks->num-1) {
3899                                 locks->lock[i] = locks->lock[locks->num-1];
3900                         }
3901                         locks->num--;
3902                         continue;
3903                 }
3904
3905                 /* This entry is locked. */
3906                 DEBUG(DEBUG_INFO, ("g_lock: lock already granted for "
3907                                    "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3908                                    (unsigned long long)id.pid,
3909                                    id.task_id, id.vnn,
3910                                    (unsigned long long)id.unique_id));
3911                 talloc_free(h);
3912                 goto again;
3913         }
3914
3915         locks->lock = talloc_realloc(locks, locks->lock, struct g_lock_rec,
3916                                      locks->num+1);
3917         if (locks->lock == NULL) {
3918                 talloc_free(h);
3919                 return false;
3920         }
3921
3922         locks->lock[locks->num].type = G_LOCK_WRITE;
3923         locks->lock[locks->num].id = id;
3924         locks->num++;
3925
3926         data.dptr = (uint8_t *)locks->lock;
3927         data.dsize = locks->num * sizeof(struct g_lock_rec);
3928
3929         if (ctdb_record_store(h, data) != 0) {
3930                 DEBUG(DEBUG_ERR, ("g_lock: failed to write transaction lock for "
3931                                   "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3932                                   (unsigned long long)id.pid,
3933                                   id.task_id, id.vnn,
3934                                   (unsigned long long)id.unique_id));
3935                 talloc_free(h);
3936                 return false;
3937         }
3938
3939         DEBUG(DEBUG_INFO, ("g_lock: lock granted for "
3940                            "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3941                            (unsigned long long)id.pid,
3942                            id.task_id, id.vnn,
3943                            (unsigned long long)id.unique_id));
3944
3945         talloc_free(h);
3946         return true;
3947 }
3948
3949 static bool g_lock_unlock(TALLOC_CTX *mem_ctx,
3950                           struct ctdb_db_context *ctdb_db,
3951                           const char *keyname, uint32_t reqid)
3952 {
3953         TDB_DATA key, data;
3954         struct ctdb_record_handle *h;
3955         struct g_lock_recs *locks;
3956         struct server_id id;
3957         int i;
3958         bool found = false;
3959
3960         key.dptr = (uint8_t *)discard_const(keyname);
3961         key.dsize = strlen(keyname) + 1;
3962         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3963         if (h == NULL) {
3964                 return false;
3965         }
3966
3967         if (!g_lock_parse(h, data, &locks)) {
3968                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3969                 talloc_free(data.dptr);
3970                 talloc_free(h);
3971                 return false;
3972         }
3973
3974         talloc_free(data.dptr);
3975
3976         id = server_id_get(ctdb_db->ctdb, reqid);
3977
3978         for (i=0; i<locks->num; i++) {
3979                 if (server_id_equal(&locks->lock[i].id, &id)) {
3980                         if (i < locks->num-1) {
3981                                 locks->lock[i] = locks->lock[locks->num-1];
3982                         }
3983                         locks->num--;
3984                         found = true;
3985                         break;
3986                 }
3987         }
3988
3989         if (!found) {
3990                 DEBUG(DEBUG_ERR, ("g_lock: lock not found\n"));
3991                 talloc_free(h);
3992                 return false;
3993         }
3994
3995         data.dptr = (uint8_t *)locks->lock;
3996         data.dsize = locks->num * sizeof(struct g_lock_rec);
3997
3998         if (ctdb_record_store(h, data) != 0) {
3999                 talloc_free(h);
4000                 return false;
4001         }
4002
4003         talloc_free(h);
4004         return true;
4005 }
4006
4007
4008 struct ctdb_transaction_handle {
4009         struct ctdb_db_context *ctdb_db;
4010         struct ctdb_db_context *g_lock_db;
4011         char *lock_name;
4012         uint32_t reqid;
4013         /*
4014          * we store reads and writes done under a transaction:
4015          * - one list stores both reads and writes (m_all)
4016          * - the other just writes (m_write)
4017          */
4018         struct ctdb_marshall_buffer *m_all;
4019         struct ctdb_marshall_buffer *m_write;
4020 };
4021
4022 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
4023 {
4024         g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid);
4025         ctdb_reqid_remove(h->ctdb_db->ctdb, h->reqid);
4026         return 0;
4027 }
4028
4029
4030 /**
4031  * start a transaction on a database
4032  */
4033 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
4034                                                        TALLOC_CTX *mem_ctx)
4035 {
4036         struct ctdb_transaction_handle *h;
4037         struct ctdb_server_id id;
4038
4039         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
4040         if (h == NULL) {
4041                 DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n"));
4042                 return NULL;
4043         }
4044
4045         h->ctdb_db = ctdb_db;
4046         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
4047                                        (unsigned int)ctdb_db->db_id);
4048         if (h->lock_name == NULL) {
4049                 DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n"));
4050                 talloc_free(h);
4051                 return NULL;
4052         }
4053
4054         h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0),
4055                                    "g_lock.tdb", false, 0);
4056         if (!h->g_lock_db) {
4057                 DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n"));
4058                 talloc_free(h);
4059                 return NULL;
4060         }
4061
4062         id.type = SERVER_TYPE_SAMBA;
4063         id.pnn = ctdb_get_pnn(ctdb_db->ctdb);
4064         id.server_id = getpid();
4065
4066         if (ctdb_ctrl_register_server_id(ctdb_db->ctdb, timeval_current_ofs(3,0),
4067                                          &id) != 0) {
4068                 DEBUG(DEBUG_ERR, (__location__ " unable to register server id\n"));
4069                 talloc_free(h);
4070                 return NULL;
4071         }
4072
4073         h->reqid = ctdb_reqid_new(h->ctdb_db->ctdb, h);
4074
4075         if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) {
4076                 DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n"));
4077                 talloc_free(h);
4078                 return NULL;
4079         }
4080
4081         talloc_set_destructor(h, ctdb_transaction_destructor);
4082         return h;
4083 }
4084
4085 /**
4086  * fetch a record inside a transaction
4087  */
4088 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
4089                            TALLOC_CTX *mem_ctx,
4090                            TDB_DATA key, TDB_DATA *data)
4091 {
4092         struct ctdb_ltdb_header header;
4093         int ret;
4094
4095         ZERO_STRUCT(header);
4096
4097         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
4098         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4099                 /* record doesn't exist yet */
4100                 *data = tdb_null;
4101                 ret = 0;
4102         }
4103
4104         if (ret != 0) {
4105                 return ret;
4106         }
4107
4108         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
4109         if (h->m_all == NULL) {
4110                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4111                 return -1;
4112         }
4113
4114         return 0;
4115 }
4116
4117 /**
4118  * stores a record inside a transaction
4119  */
4120 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
4121                            TDB_DATA key, TDB_DATA data)
4122 {
4123         TALLOC_CTX *tmp_ctx = talloc_new(h);
4124         struct ctdb_ltdb_header header;
4125         TDB_DATA olddata;
4126         int ret;
4127
4128         /* we need the header so we can update the RSN */
4129         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
4130         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4131                 /* the record doesn't exist - create one with us as dmaster.
4132                    This is only safe because we are in a transaction and this
4133                    is a persistent database */
4134                 ZERO_STRUCT(header);
4135         } else if (ret != 0) {
4136                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
4137                 talloc_free(tmp_ctx);
4138                 return ret;
4139         }
4140
4141         if (data.dsize == olddata.dsize &&
4142             memcmp(data.dptr, olddata.dptr, data.dsize) == 0 &&
4143             header.rsn != 0) {
4144                 /* save writing the same data */
4145                 talloc_free(tmp_ctx);
4146                 return 0;
4147         }
4148
4149         header.dmaster = h->ctdb_db->ctdb->pnn;
4150         header.rsn++;
4151
4152         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
4153         if (h->m_all == NULL) {
4154                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4155                 talloc_free(tmp_ctx);
4156                 return -1;
4157         }
4158
4159         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
4160         if (h->m_write == NULL) {
4161                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4162                 talloc_free(tmp_ctx);
4163                 return -1;
4164         }
4165
4166         talloc_free(tmp_ctx);
4167         return 0;
4168 }
4169
4170 static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum)
4171 {
4172         const char *keyname = CTDB_DB_SEQNUM_KEY;
4173         TDB_DATA key, data;
4174         struct ctdb_ltdb_header header;
4175         int ret;
4176
4177         key.dptr = (uint8_t *)discard_const(keyname);
4178         key.dsize = strlen(keyname) + 1;
4179
4180         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
4181         if (ret != 0) {
4182                 *seqnum = 0;
4183                 return 0;
4184         }
4185
4186         if (data.dsize != sizeof(*seqnum)) {
4187                 DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n",
4188                                   data.dsize));
4189                 talloc_free(data.dptr);
4190                 return -1;
4191         }
4192
4193         *seqnum = *(uint64_t *)data.dptr;
4194         talloc_free(data.dptr);
4195
4196         return 0;
4197 }
4198
4199
4200 static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h,
4201                                 uint64_t seqnum)
4202 {
4203         const char *keyname = CTDB_DB_SEQNUM_KEY;
4204         TDB_DATA key, data;
4205
4206         key.dptr = (uint8_t *)discard_const(keyname);
4207         key.dsize = strlen(keyname) + 1;
4208
4209         data.dptr = (uint8_t *)&seqnum;
4210         data.dsize = sizeof(seqnum);
4211
4212         return ctdb_transaction_store(h, key, data);
4213 }
4214
4215
4216 /**
4217  * commit a transaction
4218  */
4219 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
4220 {
4221         int ret;
4222         uint64_t old_seqnum, new_seqnum;
4223         int32_t status;
4224         struct timeval timeout;
4225
4226         if (h->m_write == NULL) {
4227                 /* no changes were made */
4228                 talloc_free(h);
4229                 return 0;
4230         }
4231
4232         ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum);
4233         if (ret != 0) {
4234                 DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4235                 ret = -1;
4236                 goto done;
4237         }
4238
4239         new_seqnum = old_seqnum + 1;
4240         ret = ctdb_store_db_seqnum(h, new_seqnum);
4241         if (ret != 0) {
4242                 DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n"));
4243                 ret = -1;
4244                 goto done;
4245         }
4246
4247 again:
4248         timeout = timeval_current_ofs(3,0);
4249         ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE,
4250                            h->ctdb_db->db_id,
4251                            CTDB_CONTROL_TRANS3_COMMIT, 0,
4252                            ctdb_marshall_finish(h->m_write), NULL, NULL,
4253                            &status, &timeout, NULL);
4254         if (ret != 0 || status != 0) {
4255                 /*
4256                  * TRANS3_COMMIT control will only fail if recovery has been
4257                  * triggered.  Check if the database has been updated or not.
4258                  */
4259                 ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum);
4260                 if (ret != 0) {
4261                         DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4262                         goto done;
4263                 }
4264
4265                 if (new_seqnum == old_seqnum) {
4266                         /* Database not yet updated, try again */
4267                         goto again;
4268                 }
4269
4270                 if (new_seqnum != (old_seqnum + 1)) {
4271                         DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n",
4272                                           (long long unsigned)new_seqnum,
4273                                           (long long unsigned)old_seqnum));
4274                         ret = -1;
4275                         goto done;
4276                 }
4277         }
4278
4279         ret = 0;
4280
4281 done:
4282         talloc_free(h);
4283         return ret;
4284 }
4285
4286 /**
4287  * cancel a transaction
4288  */
4289 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
4290 {
4291         talloc_free(h);
4292         return 0;
4293 }
4294
4295
4296 /*
4297   recovery daemon ping to main daemon
4298  */
4299 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4300 {
4301         int ret;
4302         int32_t res;
4303
4304         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4305                            ctdb, NULL, &res, NULL, NULL);
4306         if (ret != 0 || res != 0) {
4307                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4308                 return -1;
4309         }
4310
4311         return 0;
4312 }
4313
4314 /* When forking the main daemon and the child process needs to connect
4315  * back to the daemon as a client process, this function can be used
4316  * to change the ctdb context from daemon into client mode.  The child
4317  * process must be created using ctdb_fork() and not fork() -
4318  * ctdb_fork() does some necessary housekeeping.
4319  */
4320 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4321 {
4322         int ret;
4323         va_list ap;
4324
4325         /* Add extra information so we can identify this in the logs */
4326         va_start(ap, fmt);
4327         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4328         va_end(ap);
4329
4330         /* get a new event context */
4331         ctdb->ev = event_context_init(ctdb);
4332         tevent_loop_allow_nesting(ctdb->ev);
4333
4334         /* Connect to main CTDB daemon */
4335         ret = ctdb_socket_connect(ctdb);
4336         if (ret != 0) {
4337                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4338                 return -1;
4339         }
4340
4341         ctdb->can_send_controls = true;
4342
4343         return 0;
4344 }
4345
4346 /*
4347   get the status of running the monitor eventscripts: NULL means never run.
4348  */
4349 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4350                 struct timeval timeout, uint32_t destnode, 
4351                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4352                 struct ctdb_scripts_wire **scripts)
4353 {
4354         int ret;
4355         TDB_DATA outdata, indata;
4356         int32_t res;
4357         uint32_t uinttype = type;
4358
4359         indata.dptr = (uint8_t *)&uinttype;
4360         indata.dsize = sizeof(uinttype);
4361
4362         ret = ctdb_control(ctdb, destnode, 0, 
4363                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4364                            mem_ctx, &outdata, &res, &timeout, NULL);
4365         if (ret != 0 || res != 0) {
4366                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4367                 return -1;
4368         }
4369
4370         if (outdata.dsize == 0) {
4371                 *scripts = NULL;
4372         } else {
4373                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4374                 talloc_free(outdata.dptr);
4375         }
4376                     
4377         return 0;
4378 }
4379
4380 /*
4381   tell the main daemon how long it took to lock the reclock file
4382  */
4383 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4384 {
4385         int ret;
4386         int32_t res;
4387         TDB_DATA data;
4388
4389         data.dptr = (uint8_t *)&latency;
4390         data.dsize = sizeof(latency);
4391
4392         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4393                            ctdb, NULL, &res, NULL, NULL);
4394         if (ret != 0 || res != 0) {
4395                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4396                 return -1;
4397         }
4398
4399         return 0;
4400 }
4401
4402 /*
4403   get the name of the reclock file
4404  */
4405 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4406                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4407                          const char **name)
4408 {
4409         int ret;
4410         int32_t res;
4411         TDB_DATA data;
4412
4413         ret = ctdb_control(ctdb, destnode, 0, 
4414                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4415                            mem_ctx, &data, &res, &timeout, NULL);
4416         if (ret != 0 || res != 0) {
4417                 return -1;
4418         }
4419
4420         if (data.dsize == 0) {
4421                 *name = NULL;
4422         } else {
4423                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4424         }
4425         talloc_free(data.dptr);
4426
4427         return 0;
4428 }
4429
4430 /*
4431   set the reclock filename for a node
4432  */
4433 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4434 {
4435         int ret;
4436         TDB_DATA data;
4437         int32_t res;
4438
4439         if (reclock == NULL) {
4440                 data.dsize = 0;
4441                 data.dptr  = NULL;
4442         } else {
4443                 data.dsize = strlen(reclock) + 1;
4444                 data.dptr  = discard_const(reclock);
4445         }
4446
4447         ret = ctdb_control(ctdb, destnode, 0, 
4448                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4449                            NULL, NULL, &res, &timeout, NULL);
4450         if (ret != 0 || res != 0) {
4451                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4452                 return -1;
4453         }
4454
4455         return 0;
4456 }
4457
4458 /*
4459   stop a node
4460  */
4461 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4462 {
4463         int ret;
4464         int32_t res;
4465
4466         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4467                            ctdb, NULL, &res, &timeout, NULL);
4468         if (ret != 0 || res != 0) {
4469                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4470                 return -1;
4471         }
4472
4473         return 0;
4474 }
4475
4476 /*
4477   continue a node
4478  */
4479 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4480 {
4481         int ret;
4482
4483         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4484                            ctdb, NULL, NULL, &timeout, NULL);
4485         if (ret != 0) {
4486                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4487                 return -1;
4488         }
4489
4490         return 0;
4491 }
4492
4493 /*
4494   set the natgw state for a node
4495  */
4496 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4497 {
4498         int ret;
4499         TDB_DATA data;
4500         int32_t res;
4501
4502         data.dsize = sizeof(natgwstate);
4503         data.dptr  = (uint8_t *)&natgwstate;
4504
4505         ret = ctdb_control(ctdb, destnode, 0, 
4506                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4507                            NULL, NULL, &res, &timeout, NULL);
4508         if (ret != 0 || res != 0) {
4509                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4510                 return -1;
4511         }
4512
4513         return 0;
4514 }
4515
4516 /*
4517   set the lmaster role for a node
4518  */
4519 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4520 {
4521         int ret;
4522         TDB_DATA data;
4523         int32_t res;
4524
4525         data.dsize = sizeof(lmasterrole);
4526         data.dptr  = (uint8_t *)&lmasterrole;
4527
4528         ret = ctdb_control(ctdb, destnode, 0, 
4529                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4530                            NULL, NULL, &res, &timeout, NULL);
4531         if (ret != 0 || res != 0) {
4532                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4533                 return -1;
4534         }
4535
4536         return 0;
4537 }
4538
4539 /*
4540   set the recmaster role for a node
4541  */
4542 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4543 {
4544         int ret;
4545         TDB_DATA data;
4546         int32_t res;
4547
4548         data.dsize = sizeof(recmasterrole);
4549         data.dptr  = (uint8_t *)&recmasterrole;
4550
4551         ret = ctdb_control(ctdb, destnode, 0, 
4552                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4553                            NULL, NULL, &res, &timeout, NULL);
4554         if (ret != 0 || res != 0) {
4555                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4556                 return -1;
4557         }
4558
4559         return 0;
4560 }
4561
4562 /* enable an eventscript
4563  */
4564 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4565 {
4566         int ret;
4567         TDB_DATA data;
4568         int32_t res;
4569
4570         data.dsize = strlen(script) + 1;
4571         data.dptr  = discard_const(script);
4572
4573         ret = ctdb_control(ctdb, destnode, 0, 
4574                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4575                            NULL, NULL, &res, &timeout, NULL);
4576         if (ret != 0 || res != 0) {
4577                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4578                 return -1;
4579         }
4580
4581         return 0;
4582 }
4583
4584 /* disable an eventscript
4585  */
4586 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4587 {
4588         int ret;
4589         TDB_DATA data;
4590         int32_t res;
4591
4592         data.dsize = strlen(script) + 1;
4593         data.dptr  = discard_const(script);
4594
4595         ret = ctdb_control(ctdb, destnode, 0, 
4596                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4597                            NULL, NULL, &res, &timeout, NULL);
4598         if (ret != 0 || res != 0) {
4599                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4600                 return -1;
4601         }
4602
4603         return 0;
4604 }
4605
4606
4607 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4608 {
4609         int ret;
4610         TDB_DATA data;
4611         int32_t res;
4612
4613         data.dsize = sizeof(*bantime);
4614         data.dptr  = (uint8_t *)bantime;
4615
4616         ret = ctdb_control(ctdb, destnode, 0, 
4617                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4618                            NULL, NULL, &res, &timeout, NULL);
4619         if (ret != 0 || res != 0) {
4620                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4621                 return -1;
4622         }
4623
4624         return 0;
4625 }
4626
4627
4628 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4629 {
4630         int ret;
4631         TDB_DATA outdata;
4632         int32_t res;
4633         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4634
4635         ret = ctdb_control(ctdb, destnode, 0, 
4636                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4637                            tmp_ctx, &outdata, &res, &timeout, NULL);
4638         if (ret != 0 || res != 0) {
4639                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4640                 talloc_free(tmp_ctx);
4641                 return -1;
4642         }
4643
4644         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4645         talloc_free(tmp_ctx);
4646
4647         return 0;
4648 }
4649
4650
4651 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4652 {
4653         int ret;
4654         int32_t res;
4655         TDB_DATA data;
4656         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4657
4658         data.dptr = (uint8_t*)db_prio;
4659         data.dsize = sizeof(*db_prio);
4660
4661         ret = ctdb_control(ctdb, destnode, 0, 
4662                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4663                            tmp_ctx, NULL, &res, &timeout, NULL);
4664         if (ret != 0 || res != 0) {
4665                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4666                 talloc_free(tmp_ctx);
4667                 return -1;
4668         }
4669
4670         talloc_free(tmp_ctx);
4671
4672         return 0;
4673 }
4674
4675 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4676 {
4677         int ret;
4678         int32_t res;
4679         TDB_DATA data;
4680         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4681
4682         data.dptr = (uint8_t*)&db_id;
4683         data.dsize = sizeof(db_id);
4684
4685         ret = ctdb_control(ctdb, destnode, 0, 
4686                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4687                            tmp_ctx, NULL, &res, &timeout, NULL);
4688         if (ret != 0 || res < 0) {
4689                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4690                 talloc_free(tmp_ctx);
4691                 return -1;
4692         }
4693
4694         if (priority) {
4695                 *priority = res;
4696         }
4697
4698         talloc_free(tmp_ctx);
4699
4700         return 0;
4701 }
4702
4703 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4704 {
4705         int ret;
4706         TDB_DATA outdata;
4707         int32_t res;
4708
4709         ret = ctdb_control(ctdb, destnode, 0, 
4710                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4711                            mem_ctx, &outdata, &res, &timeout, NULL);
4712         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4713                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4714                 return -1;
4715         }
4716
4717         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4718         talloc_free(outdata.dptr);
4719                     
4720         return 0;
4721 }
4722
4723 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4724 {
4725         if (h == NULL) {
4726                 return NULL;
4727         }
4728
4729         return &h->header;
4730 }
4731
4732
4733 struct ctdb_client_control_state *
4734 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4735 {
4736         struct ctdb_client_control_state *handle;
4737         struct ctdb_marshall_buffer *m;
4738         struct ctdb_rec_data *rec;
4739         TDB_DATA outdata;
4740
4741         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4742         if (m == NULL) {
4743                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4744                 return NULL;
4745         }
4746
4747         m->db_id = ctdb_db->db_id;
4748
4749         rec = ctdb_marshall_record(m, 0, key, header, data);
4750         if (rec == NULL) {
4751                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4752                 talloc_free(m);
4753                 return NULL;
4754         }
4755         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4756         if (m == NULL) {
4757                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4758                 talloc_free(m);
4759                 return NULL;
4760         }
4761         m->count++;
4762         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4763
4764
4765         outdata.dptr = (uint8_t *)m;
4766         outdata.dsize = talloc_get_size(m);
4767
4768         handle = ctdb_control_send(ctdb, destnode, 0, 
4769                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4770                            mem_ctx, &timeout, NULL);
4771         talloc_free(m);
4772         return handle;
4773 }
4774
4775 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4776 {
4777         int ret;
4778         int32_t res;
4779
4780         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4781         if ( (ret != 0) || (res != 0) ){
4782                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4783                 return -1;
4784         }
4785
4786         return 0;
4787 }
4788
4789 int
4790 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4791 {
4792         struct ctdb_client_control_state *state;
4793
4794         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4795         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4796 }
4797
4798
4799
4800
4801
4802
4803 /*
4804   set a database to be readonly
4805  */
4806 struct ctdb_client_control_state *
4807 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4808 {
4809         TDB_DATA data;
4810
4811         data.dptr = (uint8_t *)&dbid;
4812         data.dsize = sizeof(dbid);
4813
4814         return ctdb_control_send(ctdb, destnode, 0, 
4815                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4816                            ctdb, NULL, NULL);
4817 }
4818
4819 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4820 {
4821         int ret;
4822         int32_t res;
4823
4824         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4825         if (ret != 0 || res != 0) {
4826           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4827                 return -1;
4828         }
4829
4830         return 0;
4831 }
4832
4833 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4834 {
4835         struct ctdb_client_control_state *state;
4836
4837         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4838         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4839 }
4840
4841 /*
4842   set a database to be sticky
4843  */
4844 struct ctdb_client_control_state *
4845 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4846 {
4847         TDB_DATA data;
4848
4849         data.dptr = (uint8_t *)&dbid;
4850         data.dsize = sizeof(dbid);
4851
4852         return ctdb_control_send(ctdb, destnode, 0, 
4853                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4854                            ctdb, NULL, NULL);
4855 }
4856
4857 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4858 {
4859         int ret;
4860         int32_t res;
4861
4862         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4863         if (ret != 0 || res != 0) {
4864                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4865                 return -1;
4866         }
4867
4868         return 0;
4869 }
4870
4871 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4872 {
4873         struct ctdb_client_control_state *state;
4874
4875         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4876         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4877 }