c8ab1cd6cb1b9bac33e879fd03fac770c88400d2
[samba.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/locale.h"
28 #include <stdlib.h>
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 /*
33   allocate a packet for use in client<->daemon communication
34  */
35 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
36                                             TALLOC_CTX *mem_ctx, 
37                                             enum ctdb_operation operation, 
38                                             size_t length, size_t slength,
39                                             const char *type)
40 {
41         int size;
42         struct ctdb_req_header *hdr;
43
44         length = MAX(length, slength);
45         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
46
47         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
48         if (hdr == NULL) {
49                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
50                          operation, (unsigned)length));
51                 return NULL;
52         }
53         talloc_set_name_const(hdr, type);
54         hdr->length       = length;
55         hdr->operation    = operation;
56         hdr->ctdb_magic   = CTDB_MAGIC;
57         hdr->ctdb_version = CTDB_VERSION;
58         hdr->srcnode      = ctdb->pnn;
59         if (ctdb->vnn_map) {
60                 hdr->generation = ctdb->vnn_map->generation;
61         }
62
63         return hdr;
64 }
65
66 /*
67   local version of ctdb_call
68 */
69 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
70                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
71                     TDB_DATA *data, bool updatetdb)
72 {
73         struct ctdb_call_info *c;
74         struct ctdb_registered_call *fn;
75         struct ctdb_context *ctdb = ctdb_db->ctdb;
76         
77         c = talloc(ctdb, struct ctdb_call_info);
78         CTDB_NO_MEMORY(ctdb, c);
79
80         c->key = call->key;
81         c->call_data = &call->call_data;
82         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
83         c->record_data.dsize = data->dsize;
84         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
85         c->new_data = NULL;
86         c->reply_data = NULL;
87         c->status = 0;
88         c->header = header;
89
90         for (fn=ctdb_db->calls;fn;fn=fn->next) {
91                 if (fn->id == call->call_id) break;
92         }
93         if (fn == NULL) {
94                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
95                 talloc_free(c);
96                 return -1;
97         }
98
99         if (fn->fn(c) != 0) {
100                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
101                 talloc_free(c);
102                 return -1;
103         }
104
105         /* we need to force the record to be written out if this was a remote access */
106         if (c->new_data == NULL) {
107                 c->new_data = &c->record_data;
108         }
109
110         if (c->new_data && updatetdb) {
111                 /* XXX check that we always have the lock here? */
112                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
113                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
114                         talloc_free(c);
115                         return -1;
116                 }
117         }
118
119         if (c->reply_data) {
120                 call->reply_data = *c->reply_data;
121
122                 talloc_steal(call, call->reply_data.dptr);
123                 talloc_set_name_const(call->reply_data.dptr, __location__);
124         } else {
125                 call->reply_data.dptr = NULL;
126                 call->reply_data.dsize = 0;
127         }
128         call->status = c->status;
129
130         talloc_free(c);
131
132         return 0;
133 }
134
135
136 /*
137   queue a packet for sending from client to daemon
138 */
139 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
140 {
141         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
142 }
143
144
145 /*
146   called when a CTDB_REPLY_CALL packet comes in in the client
147
148   This packet comes in response to a CTDB_REQ_CALL request packet. It
149   contains any reply data from the call
150 */
151 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
152 {
153         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
154         struct ctdb_client_call_state *state;
155
156         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
157         if (state == NULL) {
158                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
159                 return;
160         }
161
162         if (hdr->reqid != state->reqid) {
163                 /* we found a record  but it was the wrong one */
164                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
165                 return;
166         }
167
168         state->call->reply_data.dptr = c->data;
169         state->call->reply_data.dsize = c->datalen;
170         state->call->status = c->status;
171
172         talloc_steal(state, c);
173
174         state->state = CTDB_CALL_DONE;
175
176         if (state->async.fn) {
177                 state->async.fn(state);
178         }
179 }
180
181 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
182
183 /*
184   this is called in the client, when data comes in from the daemon
185  */
186 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
187 {
188         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
189         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
190         TALLOC_CTX *tmp_ctx;
191
192         /* place the packet as a child of a tmp_ctx. We then use
193            talloc_free() below to free it. If any of the calls want
194            to keep it, then they will steal it somewhere else, and the
195            talloc_free() will be a no-op */
196         tmp_ctx = talloc_new(ctdb);
197         talloc_steal(tmp_ctx, hdr);
198
199         if (cnt == 0) {
200                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
201                 exit(1);
202         }
203
204         if (cnt < sizeof(*hdr)) {
205                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
206                 goto done;
207         }
208         if (cnt != hdr->length) {
209                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
210                                (unsigned)hdr->length, (unsigned)cnt);
211                 goto done;
212         }
213
214         if (hdr->ctdb_magic != CTDB_MAGIC) {
215                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
216                 goto done;
217         }
218
219         if (hdr->ctdb_version != CTDB_VERSION) {
220                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
221                 goto done;
222         }
223
224         switch (hdr->operation) {
225         case CTDB_REPLY_CALL:
226                 ctdb_client_reply_call(ctdb, hdr);
227                 break;
228
229         case CTDB_REQ_MESSAGE:
230                 ctdb_request_message(ctdb, hdr);
231                 break;
232
233         case CTDB_REPLY_CONTROL:
234                 ctdb_client_reply_control(ctdb, hdr);
235                 break;
236
237         default:
238                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
239         }
240
241 done:
242         talloc_free(tmp_ctx);
243 }
244
245 /*
246   connect to a unix domain socket
247 */
248 int ctdb_socket_connect(struct ctdb_context *ctdb)
249 {
250         struct sockaddr_un addr;
251
252         memset(&addr, 0, sizeof(addr));
253         addr.sun_family = AF_UNIX;
254         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
255
256         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
257         if (ctdb->daemon.sd == -1) {
258                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
259                 return -1;
260         }
261
262         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
263                 close(ctdb->daemon.sd);
264                 ctdb->daemon.sd = -1;
265                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
266                 return -1;
267         }
268
269         set_nonblocking(ctdb->daemon.sd);
270         set_close_on_exec(ctdb->daemon.sd);
271         
272         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
273                                               CTDB_DS_ALIGNMENT, 
274                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
275         return 0;
276 }
277
278
279 struct ctdb_record_handle {
280         struct ctdb_db_context *ctdb_db;
281         TDB_DATA key;
282         TDB_DATA *data;
283         struct ctdb_ltdb_header header;
284 };
285
286
287 /*
288   make a recv call to the local ctdb daemon - called from client context
289
290   This is called when the program wants to wait for a ctdb_call to complete and get the 
291   results. This call will block unless the call has already completed.
292 */
293 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
294 {
295         if (state == NULL) {
296                 return -1;
297         }
298
299         while (state->state < CTDB_CALL_DONE) {
300                 event_loop_once(state->ctdb_db->ctdb->ev);
301         }
302         if (state->state != CTDB_CALL_DONE) {
303                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
304                 talloc_free(state);
305                 return -1;
306         }
307
308         if (state->call->reply_data.dsize) {
309                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
310                                                       state->call->reply_data.dptr,
311                                                       state->call->reply_data.dsize);
312                 call->reply_data.dsize = state->call->reply_data.dsize;
313         } else {
314                 call->reply_data.dptr = NULL;
315                 call->reply_data.dsize = 0;
316         }
317         call->status = state->call->status;
318         talloc_free(state);
319
320         return call->status;
321 }
322
323
324
325
326 /*
327   destroy a ctdb_call in client
328 */
329 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
330 {
331         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
332         return 0;
333 }
334
335 /*
336   construct an event driven local ctdb_call
337
338   this is used so that locally processed ctdb_call requests are processed
339   in an event driven manner
340 */
341 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
342                                                                   struct ctdb_call *call,
343                                                                   struct ctdb_ltdb_header *header,
344                                                                   TDB_DATA *data)
345 {
346         struct ctdb_client_call_state *state;
347         struct ctdb_context *ctdb = ctdb_db->ctdb;
348         int ret;
349
350         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
351         CTDB_NO_MEMORY_NULL(ctdb, state);
352         state->call = talloc_zero(state, struct ctdb_call);
353         CTDB_NO_MEMORY_NULL(ctdb, state->call);
354
355         talloc_steal(state, data->dptr);
356
357         state->state   = CTDB_CALL_DONE;
358         *(state->call) = *call;
359         state->ctdb_db = ctdb_db;
360
361         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
362         if (ret != 0) {
363                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
364         }
365
366         return state;
367 }
368
369 /*
370   make a ctdb call to the local daemon - async send. Called from client context.
371
372   This constructs a ctdb_call request and queues it for processing. 
373   This call never blocks.
374 */
375 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
376                                               struct ctdb_call *call)
377 {
378         struct ctdb_client_call_state *state;
379         struct ctdb_context *ctdb = ctdb_db->ctdb;
380         struct ctdb_ltdb_header header;
381         TDB_DATA data;
382         int ret;
383         size_t len;
384         struct ctdb_req_call *c;
385
386         /* if the domain socket is not yet open, open it */
387         if (ctdb->daemon.sd==-1) {
388                 ctdb_socket_connect(ctdb);
389         }
390
391         ret = ctdb_ltdb_lock(ctdb_db, call->key);
392         if (ret != 0) {
393                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
394                 return NULL;
395         }
396
397         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
398
399         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
400                 ret = -1;
401         }
402
403         if (ret == 0 && header.dmaster == ctdb->pnn) {
404                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
405                 talloc_free(data.dptr);
406                 ctdb_ltdb_unlock(ctdb_db, call->key);
407                 return state;
408         }
409
410         ctdb_ltdb_unlock(ctdb_db, call->key);
411         talloc_free(data.dptr);
412
413         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
414         if (state == NULL) {
415                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
416                 return NULL;
417         }
418         state->call = talloc_zero(state, struct ctdb_call);
419         if (state->call == NULL) {
420                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
421                 return NULL;
422         }
423
424         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
425         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
426         if (c == NULL) {
427                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
428                 return NULL;
429         }
430
431         state->reqid     = ctdb_reqid_new(ctdb, state);
432         state->ctdb_db = ctdb_db;
433         talloc_set_destructor(state, ctdb_client_call_destructor);
434
435         c->hdr.reqid     = state->reqid;
436         c->flags         = call->flags;
437         c->db_id         = ctdb_db->db_id;
438         c->callid        = call->call_id;
439         c->hopcount      = 0;
440         c->keylen        = call->key.dsize;
441         c->calldatalen   = call->call_data.dsize;
442         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
443         memcpy(&c->data[call->key.dsize], 
444                call->call_data.dptr, call->call_data.dsize);
445         *(state->call)              = *call;
446         state->call->call_data.dptr = &c->data[call->key.dsize];
447         state->call->key.dptr       = &c->data[0];
448
449         state->state  = CTDB_CALL_WAIT;
450
451
452         ctdb_client_queue_pkt(ctdb, &c->hdr);
453
454         return state;
455 }
456
457
458 /*
459   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
460 */
461 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
462 {
463         struct ctdb_client_call_state *state;
464
465         state = ctdb_call_send(ctdb_db, call);
466         return ctdb_call_recv(state, call);
467 }
468
469
470 /*
471   tell the daemon what messaging srvid we will use, and register the message
472   handler function in the client
473 */
474 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
475                              ctdb_msg_fn_t handler,
476                              void *private_data)
477 {
478         int res;
479         int32_t status;
480
481         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
482                            tdb_null, NULL, NULL, &status, NULL, NULL);
483         if (res != 0 || status != 0) {
484                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
485                 return -1;
486         }
487
488         /* also need to register the handler with our own ctdb structure */
489         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
490 }
491
492 /*
493   tell the daemon we no longer want a srvid
494 */
495 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
496 {
497         int res;
498         int32_t status;
499
500         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
501                            tdb_null, NULL, NULL, &status, NULL, NULL);
502         if (res != 0 || status != 0) {
503                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
504                 return -1;
505         }
506
507         /* also need to register the handler with our own ctdb structure */
508         ctdb_deregister_message_handler(ctdb, srvid, private_data);
509         return 0;
510 }
511
512 /*
513  * check server ids
514  */
515 int ctdb_client_check_message_handlers(struct ctdb_context *ctdb, uint64_t *ids, uint32_t num,
516                                        uint8_t *result)
517 {
518         TDB_DATA indata, outdata;
519         int res;
520         int32_t status;
521         int i;
522
523         indata.dptr = (uint8_t *)ids;
524         indata.dsize = num * sizeof(*ids);
525
526         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CHECK_SRVIDS, 0,
527                            indata, ctdb, &outdata, &status, NULL, NULL);
528         if (res != 0 || status != 0) {
529                 DEBUG(DEBUG_ERR, (__location__ " failed to check srvids\n"));
530                 return -1;
531         }
532
533         if (outdata.dsize != num*sizeof(uint8_t)) {
534                 DEBUG(DEBUG_ERR, (__location__ " expected %lu bytes, received %zi bytes\n",
535                                   (long unsigned int)num*sizeof(uint8_t),
536                                   outdata.dsize));
537                 talloc_free(outdata.dptr);
538                 return -1;
539         }
540
541         for (i=0; i<num; i++) {
542                 result[i] = outdata.dptr[i];
543         }
544
545         talloc_free(outdata.dptr);
546         return 0;
547 }
548
549 /*
550   send a message - from client context
551  */
552 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
553                       uint64_t srvid, TDB_DATA data)
554 {
555         struct ctdb_req_message *r;
556         int len, res;
557
558         len = offsetof(struct ctdb_req_message, data) + data.dsize;
559         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
560                                len, struct ctdb_req_message);
561         CTDB_NO_MEMORY(ctdb, r);
562
563         r->hdr.destnode  = pnn;
564         r->srvid         = srvid;
565         r->datalen       = data.dsize;
566         memcpy(&r->data[0], data.dptr, data.dsize);
567         
568         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
569         talloc_free(r);
570         return res;
571 }
572
573
574 /*
575   cancel a ctdb_fetch_lock operation, releasing the lock
576  */
577 static int fetch_lock_destructor(struct ctdb_record_handle *h)
578 {
579         ctdb_ltdb_unlock(h->ctdb_db, h->key);
580         return 0;
581 }
582
583 /*
584   force the migration of a record to this node
585  */
586 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
587 {
588         struct ctdb_call call;
589         ZERO_STRUCT(call);
590         call.call_id = CTDB_NULL_FUNC;
591         call.key = key;
592         call.flags = CTDB_IMMEDIATE_MIGRATION;
593         return ctdb_call(ctdb_db, &call);
594 }
595
596 /*
597   try to fetch a readonly copy of a record
598  */
599 static int
600 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
601 {
602         int ret;
603
604         struct ctdb_call call;
605         ZERO_STRUCT(call);
606
607         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
608         call.call_data.dptr = NULL;
609         call.call_data.dsize = 0;
610         call.key = key;
611         call.flags = CTDB_WANT_READONLY;
612         ret = ctdb_call(ctdb_db, &call);
613
614         if (ret != 0) {
615                 return -1;
616         }
617         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
618                 return -1;
619         }
620
621         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
622         if (*hdr == NULL) {
623                 talloc_free(call.reply_data.dptr);
624                 return -1;
625         }
626
627         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
628         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
629         if (data->dptr == NULL) {
630                 talloc_free(call.reply_data.dptr);
631                 talloc_free(hdr);
632                 return -1;
633         }
634
635         return 0;
636 }
637
638 /*
639   get a lock on a record, and return the records data. Blocks until it gets the lock
640  */
641 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
642                                            TDB_DATA key, TDB_DATA *data)
643 {
644         int ret;
645         struct ctdb_record_handle *h;
646
647         /*
648           procedure is as follows:
649
650           1) get the chain lock. 
651           2) check if we are dmaster
652           3) if we are the dmaster then return handle 
653           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
654              reply from ctdbd
655           5) when we get the reply, goto (1)
656          */
657
658         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
659         if (h == NULL) {
660                 return NULL;
661         }
662
663         h->ctdb_db = ctdb_db;
664         h->key     = key;
665         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
666         if (h->key.dptr == NULL) {
667                 talloc_free(h);
668                 return NULL;
669         }
670         h->data    = data;
671
672         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
673                  (const char *)key.dptr));
674
675 again:
676         /* step 1 - get the chain lock */
677         ret = ctdb_ltdb_lock(ctdb_db, key);
678         if (ret != 0) {
679                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
680                 talloc_free(h);
681                 return NULL;
682         }
683
684         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
685
686         talloc_set_destructor(h, fetch_lock_destructor);
687
688         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
689
690         /* when torturing, ensure we test the remote path */
691         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
692             random() % 5 == 0) {
693                 h->header.dmaster = (uint32_t)-1;
694         }
695
696
697         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
698
699         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
700                 ctdb_ltdb_unlock(ctdb_db, key);
701                 ret = ctdb_client_force_migration(ctdb_db, key);
702                 if (ret != 0) {
703                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
704                         talloc_free(h);
705                         return NULL;
706                 }
707                 goto again;
708         }
709
710         /* if this is a request for read/write and we have delegations
711            we have to revoke all delegations first
712         */
713         if ((h->header.dmaster == ctdb_db->ctdb->pnn) &&
714             (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
715                 ctdb_ltdb_unlock(ctdb_db, key);
716                 ret = ctdb_client_force_migration(ctdb_db, key);
717                 if (ret != 0) {
718                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
719                         talloc_free(h);
720                         return NULL;
721                 }
722                 goto again;
723         }
724
725         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
726         return h;
727 }
728
729 /*
730   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
731  */
732 struct ctdb_record_handle *
733 ctdb_fetch_readonly_lock(
734         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
735         TDB_DATA key, TDB_DATA *data,
736         int read_only)
737 {
738         int ret;
739         struct ctdb_record_handle *h;
740         struct ctdb_ltdb_header *roheader = NULL;
741
742         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
743         if (h == NULL) {
744                 return NULL;
745         }
746
747         h->ctdb_db = ctdb_db;
748         h->key     = key;
749         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
750         if (h->key.dptr == NULL) {
751                 talloc_free(h);
752                 return NULL;
753         }
754         h->data    = data;
755
756         data->dptr = NULL;
757         data->dsize = 0;
758
759
760 again:
761         talloc_free(roheader);
762         roheader = NULL;
763
764         talloc_free(data->dptr);
765         data->dptr = NULL;
766         data->dsize = 0;
767
768         /* Lock the record/chain */
769         ret = ctdb_ltdb_lock(ctdb_db, key);
770         if (ret != 0) {
771                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
772                 talloc_free(h);
773                 return NULL;
774         }
775
776         talloc_set_destructor(h, fetch_lock_destructor);
777
778         /* Check if record exists yet in the TDB */
779         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
780         if (ret != 0) {
781                 ctdb_ltdb_unlock(ctdb_db, key);
782                 ret = ctdb_client_force_migration(ctdb_db, key);
783                 if (ret != 0) {
784                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
785                         talloc_free(h);
786                         return NULL;
787                 }
788                 goto again;
789         }
790
791         /* if this is a request for read/write and we have delegations
792            we have to revoke all delegations first
793         */
794         if ((read_only == 0) 
795         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
796         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
797                 ctdb_ltdb_unlock(ctdb_db, key);
798                 ret = ctdb_client_force_migration(ctdb_db, key);
799                 if (ret != 0) {
800                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
801                         talloc_free(h);
802                         return NULL;
803                 }
804                 goto again;
805         }
806
807         /* if we are dmaster, just return the handle */
808         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
809                 return h;
810         }
811
812         if (read_only != 0) {
813                 TDB_DATA rodata = {NULL, 0};
814
815                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
816                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
817                         return h;
818                 }
819
820                 ctdb_ltdb_unlock(ctdb_db, key);
821                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
822                 if (ret != 0) {
823                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
824                         ret = ctdb_client_force_migration(ctdb_db, key);
825                         if (ret != 0) {
826                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
827                                 talloc_free(h);
828                                 return NULL;
829                         }
830
831                         goto again;
832                 }
833
834                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
835                         ret = ctdb_client_force_migration(ctdb_db, key);
836                         if (ret != 0) {
837                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
838                                 talloc_free(h);
839                                 return NULL;
840                         }
841
842                         goto again;
843                 }
844
845                 ret = ctdb_ltdb_lock(ctdb_db, key);
846                 if (ret != 0) {
847                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
848                         talloc_free(h);
849                         return NULL;
850                 }
851
852                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
853                 if (ret != 0) {
854                         ctdb_ltdb_unlock(ctdb_db, key);
855
856                         ret = ctdb_client_force_migration(ctdb_db, key);
857                         if (ret != 0) {
858                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
859                                 talloc_free(h);
860                                 return NULL;
861                         }
862
863                         goto again;
864                 }
865
866                 return h;
867         }
868
869         /* we are not dmaster and this was not a request for a readonly lock
870          * so unlock the record, migrate it and try again
871          */
872         ctdb_ltdb_unlock(ctdb_db, key);
873         ret = ctdb_client_force_migration(ctdb_db, key);
874         if (ret != 0) {
875                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
876                 talloc_free(h);
877                 return NULL;
878         }
879         goto again;
880 }
881
882 /*
883   store some data to the record that was locked with ctdb_fetch_lock()
884 */
885 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
886 {
887         if (h->ctdb_db->persistent) {
888                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
889                 return -1;
890         }
891
892         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
893 }
894
895 /*
896   non-locking fetch of a record
897  */
898 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
899                TDB_DATA key, TDB_DATA *data)
900 {
901         struct ctdb_call call;
902         int ret;
903
904         call.call_id = CTDB_FETCH_FUNC;
905         call.call_data.dptr = NULL;
906         call.call_data.dsize = 0;
907         call.key = key;
908
909         ret = ctdb_call(ctdb_db, &call);
910
911         if (ret == 0) {
912                 *data = call.reply_data;
913                 talloc_steal(mem_ctx, data->dptr);
914         }
915
916         return ret;
917 }
918
919
920
921 /*
922    called when a control completes or timesout to invoke the callback
923    function the user provided
924 */
925 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
926         struct timeval t, void *private_data)
927 {
928         struct ctdb_client_control_state *state;
929         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
930         int ret;
931
932         state = talloc_get_type(private_data, struct ctdb_client_control_state);
933         talloc_steal(tmp_ctx, state);
934
935         ret = ctdb_control_recv(state->ctdb, state, state,
936                         NULL, 
937                         NULL, 
938                         NULL);
939         if (ret != 0) {
940                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
941         }
942
943         talloc_free(tmp_ctx);
944 }
945
946 /*
947   called when a CTDB_REPLY_CONTROL packet comes in in the client
948
949   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
950   contains any reply data from the control
951 */
952 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
953                                       struct ctdb_req_header *hdr)
954 {
955         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
956         struct ctdb_client_control_state *state;
957
958         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
959         if (state == NULL) {
960                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
961                 return;
962         }
963
964         if (hdr->reqid != state->reqid) {
965                 /* we found a record  but it was the wrong one */
966                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
967                 return;
968         }
969
970         state->outdata.dptr = c->data;
971         state->outdata.dsize = c->datalen;
972         state->status = c->status;
973         if (c->errorlen) {
974                 state->errormsg = talloc_strndup(state, 
975                                                  (char *)&c->data[c->datalen], 
976                                                  c->errorlen);
977         }
978
979         /* state->outdata now uses resources from c so we dont want c
980            to just dissappear from under us while state is still alive
981         */
982         talloc_steal(state, c);
983
984         state->state = CTDB_CONTROL_DONE;
985
986         /* if we had a callback registered for this control, pull the response
987            and call the callback.
988         */
989         if (state->async.fn) {
990                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
991         }
992 }
993
994
995 /*
996   destroy a ctdb_control in client
997 */
998 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
999 {
1000         ctdb_reqid_remove(state->ctdb, state->reqid);
1001         return 0;
1002 }
1003
1004
1005 /* time out handler for ctdb_control */
1006 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
1007         struct timeval t, void *private_data)
1008 {
1009         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
1010
1011         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
1012                          "dstnode:%u\n", state->reqid, state->c->opcode,
1013                          state->c->hdr.destnode));
1014
1015         state->state = CTDB_CONTROL_TIMEOUT;
1016
1017         /* if we had a callback registered for this control, pull the response
1018            and call the callback.
1019         */
1020         if (state->async.fn) {
1021                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1022         }
1023 }
1024
1025 /* async version of send control request */
1026 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1027                 uint32_t destnode, uint64_t srvid, 
1028                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1029                 TALLOC_CTX *mem_ctx,
1030                 struct timeval *timeout,
1031                 char **errormsg)
1032 {
1033         struct ctdb_client_control_state *state;
1034         size_t len;
1035         struct ctdb_req_control *c;
1036         int ret;
1037
1038         if (errormsg) {
1039                 *errormsg = NULL;
1040         }
1041
1042         /* if the domain socket is not yet open, open it */
1043         if (ctdb->daemon.sd==-1) {
1044                 ctdb_socket_connect(ctdb);
1045         }
1046
1047         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1048         CTDB_NO_MEMORY_NULL(ctdb, state);
1049
1050         state->ctdb       = ctdb;
1051         state->reqid      = ctdb_reqid_new(ctdb, state);
1052         state->state      = CTDB_CONTROL_WAIT;
1053         state->errormsg   = NULL;
1054
1055         talloc_set_destructor(state, ctdb_client_control_destructor);
1056
1057         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1058         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1059                                len, struct ctdb_req_control);
1060         state->c            = c;        
1061         CTDB_NO_MEMORY_NULL(ctdb, c);
1062         c->hdr.reqid        = state->reqid;
1063         c->hdr.destnode     = destnode;
1064         c->opcode           = opcode;
1065         c->client_id        = 0;
1066         c->flags            = flags;
1067         c->srvid            = srvid;
1068         c->datalen          = data.dsize;
1069         if (data.dsize) {
1070                 memcpy(&c->data[0], data.dptr, data.dsize);
1071         }
1072
1073         /* timeout */
1074         if (timeout && !timeval_is_zero(timeout)) {
1075                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1076         }
1077
1078         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1079         if (ret != 0) {
1080                 talloc_free(state);
1081                 return NULL;
1082         }
1083
1084         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1085                 talloc_free(state);
1086                 return NULL;
1087         }
1088
1089         return state;
1090 }
1091
1092
1093 /* async version of receive control reply */
1094 int ctdb_control_recv(struct ctdb_context *ctdb, 
1095                 struct ctdb_client_control_state *state, 
1096                 TALLOC_CTX *mem_ctx,
1097                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1098 {
1099         TALLOC_CTX *tmp_ctx;
1100
1101         if (status != NULL) {
1102                 *status = -1;
1103         }
1104         if (errormsg != NULL) {
1105                 *errormsg = NULL;
1106         }
1107
1108         if (state == NULL) {
1109                 return -1;
1110         }
1111
1112         /* prevent double free of state */
1113         tmp_ctx = talloc_new(ctdb);
1114         talloc_steal(tmp_ctx, state);
1115
1116         /* loop one event at a time until we either timeout or the control
1117            completes.
1118         */
1119         while (state->state == CTDB_CONTROL_WAIT) {
1120                 event_loop_once(ctdb->ev);
1121         }
1122
1123         if (state->state != CTDB_CONTROL_DONE) {
1124                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1125                 if (state->async.fn) {
1126                         state->async.fn(state);
1127                 }
1128                 talloc_free(tmp_ctx);
1129                 return -1;
1130         }
1131
1132         if (state->errormsg) {
1133                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1134                 if (errormsg) {
1135                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1136                 }
1137                 if (state->async.fn) {
1138                         state->async.fn(state);
1139                 }
1140                 talloc_free(tmp_ctx);
1141                 return -1;
1142         }
1143
1144         if (outdata) {
1145                 *outdata = state->outdata;
1146                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1147         }
1148
1149         if (status) {
1150                 *status = state->status;
1151         }
1152
1153         if (state->async.fn) {
1154                 state->async.fn(state);
1155         }
1156
1157         talloc_free(tmp_ctx);
1158         return 0;
1159 }
1160
1161
1162
1163 /*
1164   send a ctdb control message
1165   timeout specifies how long we should wait for a reply.
1166   if timeout is NULL we wait indefinitely
1167  */
1168 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1169                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1170                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1171                  struct timeval *timeout,
1172                  char **errormsg)
1173 {
1174         struct ctdb_client_control_state *state;
1175
1176         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1177                         flags, data, mem_ctx,
1178                         timeout, errormsg);
1179
1180         /* FIXME: Error conditions in ctdb_control_send return NULL without
1181          * setting errormsg.  So, there is no way to distinguish between sucess
1182          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1183         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1184                 if (status != NULL) {
1185                         *status = 0;
1186                 }
1187                 return 0;
1188         }
1189
1190         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1191                         errormsg);
1192 }
1193
1194
1195
1196
1197 /*
1198   a process exists call. Returns 0 if process exists, -1 otherwise
1199  */
1200 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1201 {
1202         int ret;
1203         TDB_DATA data;
1204         int32_t status;
1205
1206         data.dptr = (uint8_t*)&pid;
1207         data.dsize = sizeof(pid);
1208
1209         ret = ctdb_control(ctdb, destnode, 0, 
1210                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1211                            NULL, NULL, &status, NULL, NULL);
1212         if (ret != 0) {
1213                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1214                 return -1;
1215         }
1216
1217         return status;
1218 }
1219
1220 /*
1221   get remote statistics
1222  */
1223 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1224 {
1225         int ret;
1226         TDB_DATA data;
1227         int32_t res;
1228
1229         ret = ctdb_control(ctdb, destnode, 0, 
1230                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1231                            ctdb, &data, &res, NULL, NULL);
1232         if (ret != 0 || res != 0) {
1233                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1234                 return -1;
1235         }
1236
1237         if (data.dsize != sizeof(struct ctdb_statistics)) {
1238                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1239                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1240                       return -1;
1241         }
1242
1243         *status = *(struct ctdb_statistics *)data.dptr;
1244         talloc_free(data.dptr);
1245                         
1246         return 0;
1247 }
1248
1249 /*
1250  * get db statistics
1251  */
1252 int ctdb_ctrl_dbstatistics(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1253                            TALLOC_CTX *mem_ctx, struct ctdb_db_statistics **dbstat)
1254 {
1255         int ret;
1256         TDB_DATA indata, outdata;
1257         int32_t res;
1258         struct ctdb_db_statistics *wire, *s;
1259         char *ptr;
1260         int i;
1261
1262         indata.dptr = (uint8_t *)&dbid;
1263         indata.dsize = sizeof(dbid);
1264
1265         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_STATISTICS,
1266                            0, indata, ctdb, &outdata, &res, NULL, NULL);
1267         if (ret != 0 || res != 0) {
1268                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for dbstatistics failed\n"));
1269                 return -1;
1270         }
1271
1272         if (outdata.dsize < offsetof(struct ctdb_db_statistics, hot_keys_wire)) {
1273                 DEBUG(DEBUG_ERR,(__location__ " Wrong dbstatistics size %zi - expected >= %lu\n",
1274                                  outdata.dsize,
1275                                  (long unsigned int)sizeof(struct ctdb_statistics)));
1276                 return -1;
1277         }
1278
1279         s = talloc_zero(mem_ctx, struct ctdb_db_statistics);
1280         if (s == NULL) {
1281                 talloc_free(outdata.dptr);
1282                 CTDB_NO_MEMORY(ctdb, s);
1283         }
1284
1285         wire = (struct ctdb_db_statistics *)outdata.dptr;
1286         *s = *wire;
1287         ptr = &wire->hot_keys_wire[0];
1288         for (i=0; i<wire->num_hot_keys; i++) {
1289                 s->hot_keys[i].key.dptr = talloc_size(mem_ctx, s->hot_keys[i].key.dsize);
1290                 if (s->hot_keys[i].key.dptr == NULL) {
1291                         talloc_free(outdata.dptr);
1292                         CTDB_NO_MEMORY(ctdb, s->hot_keys[i].key.dptr);
1293                 }
1294
1295                 memcpy(s->hot_keys[i].key.dptr, ptr, s->hot_keys[i].key.dsize);
1296                 ptr += wire->hot_keys[i].key.dsize;
1297         }
1298
1299         talloc_free(outdata.dptr);
1300         *dbstat = s;
1301         return 0;
1302 }
1303
1304 /*
1305   shutdown a remote ctdb node
1306  */
1307 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1308 {
1309         struct ctdb_client_control_state *state;
1310
1311         state = ctdb_control_send(ctdb, destnode, 0, 
1312                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1313                            NULL, &timeout, NULL);
1314         if (state == NULL) {
1315                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1316                 return -1;
1317         }
1318
1319         return 0;
1320 }
1321
1322 /*
1323   get vnn map from a remote node
1324  */
1325 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1326 {
1327         int ret;
1328         TDB_DATA outdata;
1329         int32_t res;
1330         struct ctdb_vnn_map_wire *map;
1331
1332         ret = ctdb_control(ctdb, destnode, 0, 
1333                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1334                            mem_ctx, &outdata, &res, &timeout, NULL);
1335         if (ret != 0 || res != 0) {
1336                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1337                 return -1;
1338         }
1339         
1340         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1341         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1342             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1343                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1344                 return -1;
1345         }
1346
1347         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1348         CTDB_NO_MEMORY(ctdb, *vnnmap);
1349         (*vnnmap)->generation = map->generation;
1350         (*vnnmap)->size       = map->size;
1351         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1352
1353         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1354         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1355         talloc_free(outdata.dptr);
1356                     
1357         return 0;
1358 }
1359
1360
1361 /*
1362   get the recovery mode of a remote node
1363  */
1364 struct ctdb_client_control_state *
1365 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1366 {
1367         return ctdb_control_send(ctdb, destnode, 0, 
1368                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1369                            mem_ctx, &timeout, NULL);
1370 }
1371
1372 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1373 {
1374         int ret;
1375         int32_t res;
1376
1377         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1378         if (ret != 0) {
1379                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1380                 return -1;
1381         }
1382
1383         if (recmode) {
1384                 *recmode = (uint32_t)res;
1385         }
1386
1387         return 0;
1388 }
1389
1390 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1391 {
1392         struct ctdb_client_control_state *state;
1393
1394         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1395         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1396 }
1397
1398
1399
1400
1401 /*
1402   set the recovery mode of a remote node
1403  */
1404 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1405 {
1406         int ret;
1407         TDB_DATA data;
1408         int32_t res;
1409
1410         data.dsize = sizeof(uint32_t);
1411         data.dptr = (unsigned char *)&recmode;
1412
1413         ret = ctdb_control(ctdb, destnode, 0, 
1414                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1415                            NULL, NULL, &res, &timeout, NULL);
1416         if (ret != 0 || res != 0) {
1417                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1418                 return -1;
1419         }
1420
1421         return 0;
1422 }
1423
1424
1425
1426 /*
1427   get the recovery master of a remote node
1428  */
1429 struct ctdb_client_control_state *
1430 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1431                         struct timeval timeout, uint32_t destnode)
1432 {
1433         return ctdb_control_send(ctdb, destnode, 0, 
1434                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1435                            mem_ctx, &timeout, NULL);
1436 }
1437
1438 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1439 {
1440         int ret;
1441         int32_t res;
1442
1443         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1444         if (ret != 0) {
1445                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1446                 return -1;
1447         }
1448
1449         if (recmaster) {
1450                 *recmaster = (uint32_t)res;
1451         }
1452
1453         return 0;
1454 }
1455
1456 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1457 {
1458         struct ctdb_client_control_state *state;
1459
1460         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1461         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1462 }
1463
1464
1465 /*
1466   set the recovery master of a remote node
1467  */
1468 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1469 {
1470         int ret;
1471         TDB_DATA data;
1472         int32_t res;
1473
1474         ZERO_STRUCT(data);
1475         data.dsize = sizeof(uint32_t);
1476         data.dptr = (unsigned char *)&recmaster;
1477
1478         ret = ctdb_control(ctdb, destnode, 0, 
1479                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1480                            NULL, NULL, &res, &timeout, NULL);
1481         if (ret != 0 || res != 0) {
1482                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1483                 return -1;
1484         }
1485
1486         return 0;
1487 }
1488
1489
1490 /*
1491   get a list of databases off a remote node
1492  */
1493 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1494                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1495 {
1496         int ret;
1497         TDB_DATA outdata;
1498         int32_t res;
1499
1500         ret = ctdb_control(ctdb, destnode, 0, 
1501                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1502                            mem_ctx, &outdata, &res, &timeout, NULL);
1503         if (ret != 0 || res != 0) {
1504                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1505                 return -1;
1506         }
1507
1508         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1509         talloc_free(outdata.dptr);
1510                     
1511         return 0;
1512 }
1513
1514 /*
1515   get a list of nodes (vnn and flags ) from a remote node
1516  */
1517 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1518                 struct timeval timeout, uint32_t destnode, 
1519                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1520 {
1521         int ret;
1522         TDB_DATA outdata;
1523         int32_t res;
1524
1525         ret = ctdb_control(ctdb, destnode, 0, 
1526                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1527                            mem_ctx, &outdata, &res, &timeout, NULL);
1528         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1529                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1530                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1531         }
1532         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1533                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1534                 return -1;
1535         }
1536
1537         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1538         talloc_free(outdata.dptr);
1539                     
1540         return 0;
1541 }
1542
1543 /*
1544   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1545  */
1546 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1547                 struct timeval timeout, uint32_t destnode, 
1548                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1549 {
1550         int ret, i, len;
1551         TDB_DATA outdata;
1552         struct ctdb_node_mapv4 *nodemapv4;
1553         int32_t res;
1554
1555         ret = ctdb_control(ctdb, destnode, 0, 
1556                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1557                            mem_ctx, &outdata, &res, &timeout, NULL);
1558         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1559                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1560                 return -1;
1561         }
1562
1563         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1564
1565         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1566         (*nodemap) = talloc_zero_size(mem_ctx, len);
1567         CTDB_NO_MEMORY(ctdb, (*nodemap));
1568
1569         (*nodemap)->num = nodemapv4->num;
1570         for (i=0; i<nodemapv4->num; i++) {
1571                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1572                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1573                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1574                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1575         }
1576                 
1577         talloc_free(outdata.dptr);
1578                     
1579         return 0;
1580 }
1581
1582 /*
1583   drop the transport, reload the nodes file and restart the transport
1584  */
1585 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1586                     struct timeval timeout, uint32_t destnode)
1587 {
1588         int ret;
1589         int32_t res;
1590
1591         ret = ctdb_control(ctdb, destnode, 0, 
1592                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1593                            NULL, NULL, &res, &timeout, NULL);
1594         if (ret != 0 || res != 0) {
1595                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1596                 return -1;
1597         }
1598
1599         return 0;
1600 }
1601
1602
1603 /*
1604   set vnn map on a node
1605  */
1606 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1607                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1608 {
1609         int ret;
1610         TDB_DATA data;
1611         int32_t res;
1612         struct ctdb_vnn_map_wire *map;
1613         size_t len;
1614
1615         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1616         map = talloc_size(mem_ctx, len);
1617         CTDB_NO_MEMORY(ctdb, map);
1618
1619         map->generation = vnnmap->generation;
1620         map->size = vnnmap->size;
1621         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1622         
1623         data.dsize = len;
1624         data.dptr  = (uint8_t *)map;
1625
1626         ret = ctdb_control(ctdb, destnode, 0, 
1627                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1628                            NULL, NULL, &res, &timeout, NULL);
1629         if (ret != 0 || res != 0) {
1630                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1631                 return -1;
1632         }
1633
1634         talloc_free(map);
1635
1636         return 0;
1637 }
1638
1639
1640 /*
1641   async send for pull database
1642  */
1643 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1644         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1645         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1646 {
1647         TDB_DATA indata;
1648         struct ctdb_control_pulldb *pull;
1649         struct ctdb_client_control_state *state;
1650
1651         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1652         CTDB_NO_MEMORY_NULL(ctdb, pull);
1653
1654         pull->db_id   = dbid;
1655         pull->lmaster = lmaster;
1656
1657         indata.dsize = sizeof(struct ctdb_control_pulldb);
1658         indata.dptr  = (unsigned char *)pull;
1659
1660         state = ctdb_control_send(ctdb, destnode, 0, 
1661                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1662                                   mem_ctx, &timeout, NULL);
1663         talloc_free(pull);
1664
1665         return state;
1666 }
1667
1668 /*
1669   async recv for pull database
1670  */
1671 int ctdb_ctrl_pulldb_recv(
1672         struct ctdb_context *ctdb, 
1673         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1674         TDB_DATA *outdata)
1675 {
1676         int ret;
1677         int32_t res;
1678
1679         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1680         if ( (ret != 0) || (res != 0) ){
1681                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1682                 return -1;
1683         }
1684
1685         return 0;
1686 }
1687
1688 /*
1689   pull all keys and records for a specific database on a node
1690  */
1691 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1692                 uint32_t dbid, uint32_t lmaster, 
1693                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1694                 TDB_DATA *outdata)
1695 {
1696         struct ctdb_client_control_state *state;
1697
1698         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1699                                       timeout);
1700         
1701         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1702 }
1703
1704
1705 /*
1706   change dmaster for all keys in the database to the new value
1707  */
1708 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1709                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1710 {
1711         int ret;
1712         TDB_DATA indata;
1713         int32_t res;
1714
1715         indata.dsize = 2*sizeof(uint32_t);
1716         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1717
1718         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1719         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1720
1721         ret = ctdb_control(ctdb, destnode, 0, 
1722                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1723                            NULL, NULL, &res, &timeout, NULL);
1724         if (ret != 0 || res != 0) {
1725                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1726                 return -1;
1727         }
1728
1729         return 0;
1730 }
1731
1732 /*
1733   ping a node, return number of clients connected
1734  */
1735 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1736 {
1737         int ret;
1738         int32_t res;
1739
1740         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1741                            tdb_null, NULL, NULL, &res, NULL, NULL);
1742         if (ret != 0) {
1743                 return -1;
1744         }
1745         return res;
1746 }
1747
1748 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1749                            struct timeval timeout, 
1750                            uint32_t destnode,
1751                            uint32_t *runstate)
1752 {
1753         TDB_DATA outdata;
1754         int32_t res;
1755         int ret;
1756
1757         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1758                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1759         if (ret != 0 || res != 0) {
1760                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1761                 return ret != 0 ? ret : res;
1762         }
1763
1764         if (outdata.dsize != sizeof(uint32_t)) {
1765                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1766                 talloc_free(outdata.dptr);
1767                 return -1;
1768         }
1769
1770         if (runstate != NULL) {
1771                 *runstate = *(uint32_t *)outdata.dptr;
1772         }
1773         talloc_free(outdata.dptr);
1774
1775         return 0;
1776 }
1777
1778 /*
1779   find the real path to a ltdb 
1780  */
1781 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1782                    const char **path)
1783 {
1784         int ret;
1785         int32_t res;
1786         TDB_DATA data;
1787
1788         data.dptr = (uint8_t *)&dbid;
1789         data.dsize = sizeof(dbid);
1790
1791         ret = ctdb_control(ctdb, destnode, 0, 
1792                            CTDB_CONTROL_GETDBPATH, 0, data, 
1793                            mem_ctx, &data, &res, &timeout, NULL);
1794         if (ret != 0 || res != 0) {
1795                 return -1;
1796         }
1797
1798         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1799         if ((*path) == NULL) {
1800                 return -1;
1801         }
1802
1803         talloc_free(data.dptr);
1804
1805         return 0;
1806 }
1807
1808 /*
1809   find the name of a db 
1810  */
1811 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1812                    const char **name)
1813 {
1814         int ret;
1815         int32_t res;
1816         TDB_DATA data;
1817
1818         data.dptr = (uint8_t *)&dbid;
1819         data.dsize = sizeof(dbid);
1820
1821         ret = ctdb_control(ctdb, destnode, 0, 
1822                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1823                            mem_ctx, &data, &res, &timeout, NULL);
1824         if (ret != 0 || res != 0) {
1825                 return -1;
1826         }
1827
1828         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1829         if ((*name) == NULL) {
1830                 return -1;
1831         }
1832
1833         talloc_free(data.dptr);
1834
1835         return 0;
1836 }
1837
1838 /*
1839   get the health status of a db
1840  */
1841 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1842                           struct timeval timeout,
1843                           uint32_t destnode,
1844                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1845                           const char **reason)
1846 {
1847         int ret;
1848         int32_t res;
1849         TDB_DATA data;
1850
1851         data.dptr = (uint8_t *)&dbid;
1852         data.dsize = sizeof(dbid);
1853
1854         ret = ctdb_control(ctdb, destnode, 0,
1855                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1856                            mem_ctx, &data, &res, &timeout, NULL);
1857         if (ret != 0 || res != 0) {
1858                 return -1;
1859         }
1860
1861         if (data.dsize == 0) {
1862                 (*reason) = NULL;
1863                 return 0;
1864         }
1865
1866         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1867         if ((*reason) == NULL) {
1868                 return -1;
1869         }
1870
1871         talloc_free(data.dptr);
1872
1873         return 0;
1874 }
1875
1876 /*
1877  * get db sequence number
1878  */
1879 int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout,
1880                           uint32_t destnode, uint32_t dbid, uint64_t *seqnum)
1881 {
1882         int ret;
1883         int32_t res;
1884         TDB_DATA data, outdata;
1885
1886         data.dptr = (uint8_t *)&dbid;
1887         data.dsize = sizeof(uint64_t);  /* This is just wrong */
1888
1889         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_SEQNUM,
1890                            0, data, ctdb, &outdata, &res, &timeout, NULL);
1891         if (ret != 0 || res != 0) {
1892                 DEBUG(DEBUG_ERR,("ctdb_control for getdbesqnum failed\n"));
1893                 return -1;
1894         }
1895
1896         if (outdata.dsize != sizeof(uint64_t)) {
1897                 DEBUG(DEBUG_ERR,("Invalid return data in get_dbseqnum\n"));
1898                 talloc_free(outdata.dptr);
1899                 return -1;
1900         }
1901
1902         if (seqnum != NULL) {
1903                 *seqnum = *(uint64_t *)outdata.dptr;
1904         }
1905         talloc_free(outdata.dptr);
1906
1907         return 0;
1908 }
1909
1910 /*
1911   create a database
1912  */
1913 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1914                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1915 {
1916         int ret;
1917         int32_t res;
1918         TDB_DATA data;
1919         uint64_t tdb_flags = 0;
1920
1921         data.dptr = discard_const(name);
1922         data.dsize = strlen(name)+1;
1923
1924         /* Make sure that volatile databases use jenkins hash */
1925         if (!persistent) {
1926                 tdb_flags = TDB_INCOMPATIBLE_HASH;
1927         }
1928
1929         ret = ctdb_control(ctdb, destnode, tdb_flags,
1930                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1931                            0, data, 
1932                            mem_ctx, &data, &res, &timeout, NULL);
1933
1934         if (ret != 0 || res != 0) {
1935                 return -1;
1936         }
1937
1938         return 0;
1939 }
1940
1941 /*
1942   get debug level on a node
1943  */
1944 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1945 {
1946         int ret;
1947         int32_t res;
1948         TDB_DATA data;
1949
1950         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1951                            ctdb, &data, &res, NULL, NULL);
1952         if (ret != 0 || res != 0) {
1953                 return -1;
1954         }
1955         if (data.dsize != sizeof(int32_t)) {
1956                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1957                          (unsigned)data.dsize));
1958                 return -1;
1959         }
1960         *level = *(int32_t *)data.dptr;
1961         talloc_free(data.dptr);
1962         return 0;
1963 }
1964
1965 /*
1966   set debug level on a node
1967  */
1968 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1969 {
1970         int ret;
1971         int32_t res;
1972         TDB_DATA data;
1973
1974         data.dptr = (uint8_t *)&level;
1975         data.dsize = sizeof(level);
1976
1977         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1978                            NULL, NULL, &res, NULL, NULL);
1979         if (ret != 0 || res != 0) {
1980                 return -1;
1981         }
1982         return 0;
1983 }
1984
1985
1986 /*
1987   get a list of connected nodes
1988  */
1989 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1990                                 struct timeval timeout,
1991                                 TALLOC_CTX *mem_ctx,
1992                                 uint32_t *num_nodes)
1993 {
1994         struct ctdb_node_map *map=NULL;
1995         int ret, i;
1996         uint32_t *nodes;
1997
1998         *num_nodes = 0;
1999
2000         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
2001         if (ret != 0) {
2002                 return NULL;
2003         }
2004
2005         nodes = talloc_array(mem_ctx, uint32_t, map->num);
2006         if (nodes == NULL) {
2007                 return NULL;
2008         }
2009
2010         for (i=0;i<map->num;i++) {
2011                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2012                         nodes[*num_nodes] = map->nodes[i].pnn;
2013                         (*num_nodes)++;
2014                 }
2015         }
2016
2017         return nodes;
2018 }
2019
2020
2021 /*
2022   reset remote status
2023  */
2024 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
2025 {
2026         int ret;
2027         int32_t res;
2028
2029         ret = ctdb_control(ctdb, destnode, 0, 
2030                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
2031                            NULL, NULL, &res, NULL, NULL);
2032         if (ret != 0 || res != 0) {
2033                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
2034                 return -1;
2035         }
2036         return 0;
2037 }
2038
2039 /*
2040   attach to a specific database - client call
2041 */
2042 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
2043                                     struct timeval timeout,
2044                                     const char *name,
2045                                     bool persistent,
2046                                     uint32_t tdb_flags)
2047 {
2048         struct ctdb_db_context *ctdb_db;
2049         TDB_DATA data;
2050         int ret;
2051         int32_t res;
2052
2053         ctdb_db = ctdb_db_handle(ctdb, name);
2054         if (ctdb_db) {
2055                 return ctdb_db;
2056         }
2057
2058         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
2059         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
2060
2061         ctdb_db->ctdb = ctdb;
2062         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
2063         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
2064
2065         data.dptr = discard_const(name);
2066         data.dsize = strlen(name)+1;
2067
2068         /* CTDB has switched to using jenkins hash for volatile databases.
2069          * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH,
2070          * always set it.
2071          */
2072         if (!persistent) {
2073                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
2074         }
2075
2076         /* tell ctdb daemon to attach */
2077         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
2078                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
2079                            0, data, ctdb_db, &data, &res, NULL, NULL);
2080         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
2081                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
2082                 talloc_free(ctdb_db);
2083                 return NULL;
2084         }
2085         
2086         ctdb_db->db_id = *(uint32_t *)data.dptr;
2087         talloc_free(data.dptr);
2088
2089         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
2090         if (ret != 0) {
2091                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
2092                 talloc_free(ctdb_db);
2093                 return NULL;
2094         }
2095
2096         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
2097         if (ctdb->valgrinding) {
2098                 tdb_flags |= TDB_NOMMAP;
2099         }
2100         tdb_flags |= TDB_DISALLOW_NESTING;
2101
2102         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path, 0, tdb_flags,
2103                                       O_RDWR, 0);
2104         if (ctdb_db->ltdb == NULL) {
2105                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
2106                 talloc_free(ctdb_db);
2107                 return NULL;
2108         }
2109
2110         ctdb_db->persistent = persistent;
2111
2112         DLIST_ADD(ctdb->db_list, ctdb_db);
2113
2114         /* add well known functions */
2115         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2116         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2117         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2118
2119         return ctdb_db;
2120 }
2121
2122 /*
2123  * detach from a specific database - client call
2124  */
2125 int ctdb_detach(struct ctdb_context *ctdb, uint32_t db_id)
2126 {
2127         int ret;
2128         int32_t status;
2129         TDB_DATA data;
2130
2131         data.dsize = sizeof(db_id);
2132         data.dptr = (uint8_t *)&db_id;
2133
2134         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_DETACH,
2135                            0, data, NULL, NULL, &status, NULL, NULL);
2136         if (ret != 0 || status != 0) {
2137                 return -1;
2138         }
2139         return 0;
2140 }
2141
2142 /*
2143   setup a call for a database
2144  */
2145 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2146 {
2147         struct ctdb_registered_call *call;
2148
2149 #if 0
2150         TDB_DATA data;
2151         int32_t status;
2152         struct ctdb_control_set_call c;
2153         int ret;
2154
2155         /* this is no longer valid with the separate daemon architecture */
2156         c.db_id = ctdb_db->db_id;
2157         c.fn    = fn;
2158         c.id    = id;
2159
2160         data.dptr = (uint8_t *)&c;
2161         data.dsize = sizeof(c);
2162
2163         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
2164                            data, NULL, NULL, &status, NULL, NULL);
2165         if (ret != 0 || status != 0) {
2166                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
2167                 return -1;
2168         }
2169 #endif
2170
2171         /* also register locally */
2172         call = talloc(ctdb_db, struct ctdb_registered_call);
2173         call->fn = fn;
2174         call->id = id;
2175
2176         DLIST_ADD(ctdb_db->calls, call);        
2177         return 0;
2178 }
2179
2180
2181 struct traverse_state {
2182         bool done;
2183         uint32_t count;
2184         ctdb_traverse_func fn;
2185         void *private_data;
2186         bool listemptyrecords;
2187 };
2188
2189 /*
2190   called on each key during a ctdb_traverse
2191  */
2192 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2193 {
2194         struct traverse_state *state = (struct traverse_state *)p;
2195         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2196         TDB_DATA key;
2197
2198         if (data.dsize < sizeof(uint32_t) ||
2199             d->length != data.dsize) {
2200                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2201                 state->done = true;
2202                 return;
2203         }
2204
2205         key.dsize = d->keylen;
2206         key.dptr  = &d->data[0];
2207         data.dsize = d->datalen;
2208         data.dptr = &d->data[d->keylen];
2209
2210         if (key.dsize == 0 && data.dsize == 0) {
2211                 /* end of traverse */
2212                 state->done = true;
2213                 return;
2214         }
2215
2216         if (!state->listemptyrecords &&
2217             data.dsize == sizeof(struct ctdb_ltdb_header))
2218         {
2219                 /* empty records are deleted records in ctdb */
2220                 return;
2221         }
2222
2223         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2224                 state->done = true;
2225         }
2226
2227         state->count++;
2228 }
2229
2230 /**
2231  * start a cluster wide traverse, calling the supplied fn on each record
2232  * return the number of records traversed, or -1 on error
2233  *
2234  * Extendet variant with a flag to signal whether empty records should
2235  * be listed.
2236  */
2237 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2238                              ctdb_traverse_func fn,
2239                              bool withemptyrecords,
2240                              void *private_data)
2241 {
2242         TDB_DATA data;
2243         struct ctdb_traverse_start_ext t;
2244         int32_t status;
2245         int ret;
2246         uint64_t srvid = (getpid() | 0xFLL<<60);
2247         struct traverse_state state;
2248
2249         state.done = false;
2250         state.count = 0;
2251         state.private_data = private_data;
2252         state.fn = fn;
2253         state.listemptyrecords = withemptyrecords;
2254
2255         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2256         if (ret != 0) {
2257                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2258                 return -1;
2259         }
2260
2261         t.db_id = ctdb_db->db_id;
2262         t.srvid = srvid;
2263         t.reqid = 0;
2264         t.withemptyrecords = withemptyrecords;
2265
2266         data.dptr = (uint8_t *)&t;
2267         data.dsize = sizeof(t);
2268
2269         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2270                            data, NULL, NULL, &status, NULL, NULL);
2271         if (ret != 0 || status != 0) {
2272                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2273                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2274                 return -1;
2275         }
2276
2277         while (!state.done) {
2278                 event_loop_once(ctdb_db->ctdb->ev);
2279         }
2280
2281         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2282         if (ret != 0) {
2283                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2284                 return -1;
2285         }
2286
2287         return state.count;
2288 }
2289
2290 /**
2291  * start a cluster wide traverse, calling the supplied fn on each record
2292  * return the number of records traversed, or -1 on error
2293  *
2294  * Standard version which does not list the empty records:
2295  * These are considered deleted.
2296  */
2297 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2298 {
2299         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2300 }
2301
2302 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2303 /*
2304   called on each key during a catdb
2305  */
2306 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2307 {
2308         int i;
2309         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2310         FILE *f = c->f;
2311         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2312
2313         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2314         for (i=0;i<key.dsize;i++) {
2315                 if (ISASCII(key.dptr[i])) {
2316                         fprintf(f, "%c", key.dptr[i]);
2317                 } else {
2318                         fprintf(f, "\\%02X", key.dptr[i]);
2319                 }
2320         }
2321         fprintf(f, "\"\n");
2322
2323         fprintf(f, "dmaster: %u\n", h->dmaster);
2324         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2325
2326         if (c->printlmaster && ctdb->vnn_map != NULL) {
2327                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
2328         }
2329
2330         if (c->printhash) {
2331                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2332         }
2333
2334         if (c->printrecordflags) {
2335                 fprintf(f, "flags: 0x%08x", h->flags);
2336                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2337                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2338                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2339                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2340                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2341                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2342                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2343                 fprintf(f, "\n");
2344         }
2345
2346         if (c->printdatasize) {
2347                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2348         } else {
2349                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2350                 for (i=sizeof(*h);i<data.dsize;i++) {
2351                         if (ISASCII(data.dptr[i])) {
2352                                 fprintf(f, "%c", data.dptr[i]);
2353                         } else {
2354                                 fprintf(f, "\\%02X", data.dptr[i]);
2355                         }
2356                 }
2357                 fprintf(f, "\"\n");
2358         }
2359
2360         fprintf(f, "\n");
2361
2362         return 0;
2363 }
2364
2365 /*
2366   convenience function to list all keys to stdout
2367  */
2368 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2369                  struct ctdb_dump_db_context *ctx)
2370 {
2371         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2372                                  ctx->printemptyrecords, ctx);
2373 }
2374
2375 /*
2376   get the pid of a ctdb daemon
2377  */
2378 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2379 {
2380         int ret;
2381         int32_t res;
2382
2383         ret = ctdb_control(ctdb, destnode, 0, 
2384                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2385                            NULL, NULL, &res, &timeout, NULL);
2386         if (ret != 0) {
2387                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2388                 return -1;
2389         }
2390
2391         *pid = res;
2392
2393         return 0;
2394 }
2395
2396
2397 /*
2398   async freeze send control
2399  */
2400 struct ctdb_client_control_state *
2401 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2402 {
2403         return ctdb_control_send(ctdb, destnode, priority, 
2404                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2405                            mem_ctx, &timeout, NULL);
2406 }
2407
2408 /* 
2409    async freeze recv control
2410 */
2411 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2412 {
2413         int ret;
2414         int32_t res;
2415
2416         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2417         if ( (ret != 0) || (res != 0) ){
2418                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2419                 return -1;
2420         }
2421
2422         return 0;
2423 }
2424
2425 /*
2426   freeze databases of a certain priority
2427  */
2428 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2429 {
2430         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2431         struct ctdb_client_control_state *state;
2432         int ret;
2433
2434         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2435         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2436         talloc_free(tmp_ctx);
2437
2438         return ret;
2439 }
2440
2441 /* Freeze all databases */
2442 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2443 {
2444         int i;
2445
2446         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2447                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2448                         return -1;
2449                 }
2450         }
2451         return 0;
2452 }
2453
2454 /*
2455   thaw databases of a certain priority
2456  */
2457 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2458 {
2459         int ret;
2460         int32_t res;
2461
2462         ret = ctdb_control(ctdb, destnode, priority, 
2463                            CTDB_CONTROL_THAW, 0, tdb_null, 
2464                            NULL, NULL, &res, &timeout, NULL);
2465         if (ret != 0 || res != 0) {
2466                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2467                 return -1;
2468         }
2469
2470         return 0;
2471 }
2472
2473 /* thaw all databases */
2474 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2475 {
2476         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2477 }
2478
2479 /*
2480   get pnn of a node, or -1
2481  */
2482 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2483 {
2484         int ret;
2485         int32_t res;
2486
2487         ret = ctdb_control(ctdb, destnode, 0, 
2488                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2489                            NULL, NULL, &res, &timeout, NULL);
2490         if (ret != 0) {
2491                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2492                 return -1;
2493         }
2494
2495         return res;
2496 }
2497
2498 /*
2499   get the monitoring mode of a remote node
2500  */
2501 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2502 {
2503         int ret;
2504         int32_t res;
2505
2506         ret = ctdb_control(ctdb, destnode, 0, 
2507                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2508                            NULL, NULL, &res, &timeout, NULL);
2509         if (ret != 0) {
2510                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2511                 return -1;
2512         }
2513
2514         *monmode = res;
2515
2516         return 0;
2517 }
2518
2519
2520 /*
2521  set the monitoring mode of a remote node to active
2522  */
2523 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2524 {
2525         int ret;
2526         
2527
2528         ret = ctdb_control(ctdb, destnode, 0, 
2529                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2530                            NULL, NULL,NULL, &timeout, NULL);
2531         if (ret != 0) {
2532                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2533                 return -1;
2534         }
2535
2536         
2537
2538         return 0;
2539 }
2540
2541 /*
2542   set the monitoring mode of a remote node to disable
2543  */
2544 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2545 {
2546         int ret;
2547         
2548
2549         ret = ctdb_control(ctdb, destnode, 0, 
2550                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2551                            NULL, NULL, NULL, &timeout, NULL);
2552         if (ret != 0) {
2553                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2554                 return -1;
2555         }
2556
2557         
2558
2559         return 0;
2560 }
2561
2562
2563
2564 /* 
2565   sent to a node to make it take over an ip address
2566 */
2567 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2568                           uint32_t destnode, struct ctdb_public_ip *ip)
2569 {
2570         TDB_DATA data;
2571         struct ctdb_public_ipv4 ipv4;
2572         int ret;
2573         int32_t res;
2574
2575         if (ip->addr.sa.sa_family == AF_INET) {
2576                 ipv4.pnn = ip->pnn;
2577                 ipv4.sin = ip->addr.ip;
2578
2579                 data.dsize = sizeof(ipv4);
2580                 data.dptr  = (uint8_t *)&ipv4;
2581
2582                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2583                            NULL, &res, &timeout, NULL);
2584         } else {
2585                 data.dsize = sizeof(*ip);
2586                 data.dptr  = (uint8_t *)ip;
2587
2588                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2589                            NULL, &res, &timeout, NULL);
2590         }
2591
2592         if (ret != 0 || res != 0) {
2593                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2594                 return -1;
2595         }
2596
2597         return 0;       
2598 }
2599
2600
2601 /* 
2602   sent to a node to make it release an ip address
2603 */
2604 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2605                          uint32_t destnode, struct ctdb_public_ip *ip)
2606 {
2607         TDB_DATA data;
2608         struct ctdb_public_ipv4 ipv4;
2609         int ret;
2610         int32_t res;
2611
2612         if (ip->addr.sa.sa_family == AF_INET) {
2613                 ipv4.pnn = ip->pnn;
2614                 ipv4.sin = ip->addr.ip;
2615
2616                 data.dsize = sizeof(ipv4);
2617                 data.dptr  = (uint8_t *)&ipv4;
2618
2619                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2620                                    NULL, &res, &timeout, NULL);
2621         } else {
2622                 data.dsize = sizeof(*ip);
2623                 data.dptr  = (uint8_t *)ip;
2624
2625                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2626                                    NULL, &res, &timeout, NULL);
2627         }
2628
2629         if (ret != 0 || res != 0) {
2630                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2631                 return -1;
2632         }
2633
2634         return 0;       
2635 }
2636
2637
2638 /*
2639   get a tunable
2640  */
2641 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2642                           struct timeval timeout, 
2643                           uint32_t destnode,
2644                           const char *name, uint32_t *value)
2645 {
2646         struct ctdb_control_get_tunable *t;
2647         TDB_DATA data, outdata;
2648         int32_t res;
2649         int ret;
2650
2651         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2652         data.dptr  = talloc_size(ctdb, data.dsize);
2653         CTDB_NO_MEMORY(ctdb, data.dptr);
2654
2655         t = (struct ctdb_control_get_tunable *)data.dptr;
2656         t->length = strlen(name)+1;
2657         memcpy(t->name, name, t->length);
2658
2659         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2660                            &outdata, &res, &timeout, NULL);
2661         talloc_free(data.dptr);
2662         if (ret != 0 || res != 0) {
2663                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2664                 return ret != 0 ? ret : res;
2665         }
2666
2667         if (outdata.dsize != sizeof(uint32_t)) {
2668                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2669                 talloc_free(outdata.dptr);
2670                 return -1;
2671         }
2672         
2673         *value = *(uint32_t *)outdata.dptr;
2674         talloc_free(outdata.dptr);
2675
2676         return 0;
2677 }
2678
2679 /*
2680   set a tunable
2681  */
2682 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2683                           struct timeval timeout, 
2684                           uint32_t destnode,
2685                           const char *name, uint32_t value)
2686 {
2687         struct ctdb_control_set_tunable *t;
2688         TDB_DATA data;
2689         int32_t res;
2690         int ret;
2691
2692         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2693         data.dptr  = talloc_size(ctdb, data.dsize);
2694         CTDB_NO_MEMORY(ctdb, data.dptr);
2695
2696         t = (struct ctdb_control_set_tunable *)data.dptr;
2697         t->length = strlen(name)+1;
2698         memcpy(t->name, name, t->length);
2699         t->value = value;
2700
2701         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2702                            NULL, &res, &timeout, NULL);
2703         talloc_free(data.dptr);
2704         if (ret != 0 || res != 0) {
2705                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2706                 return -1;
2707         }
2708
2709         return 0;
2710 }
2711
2712 /*
2713   list tunables
2714  */
2715 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2716                             struct timeval timeout, 
2717                             uint32_t destnode,
2718                             TALLOC_CTX *mem_ctx,
2719                             const char ***list, uint32_t *count)
2720 {
2721         TDB_DATA outdata;
2722         int32_t res;
2723         int ret;
2724         struct ctdb_control_list_tunable *t;
2725         char *p, *s, *ptr;
2726
2727         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2728                            mem_ctx, &outdata, &res, &timeout, NULL);
2729         if (ret != 0 || res != 0) {
2730                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2731                 return -1;
2732         }
2733
2734         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2735         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2736             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2737                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2738                 talloc_free(outdata.dptr);
2739                 return -1;              
2740         }
2741         
2742         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2743         CTDB_NO_MEMORY(ctdb, p);
2744
2745         talloc_free(outdata.dptr);
2746         
2747         (*list) = NULL;
2748         (*count) = 0;
2749
2750         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2751                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2752                 CTDB_NO_MEMORY(ctdb, *list);
2753                 (*list)[*count] = talloc_strdup(*list, s);
2754                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2755                 (*count)++;
2756         }
2757
2758         talloc_free(p);
2759
2760         return 0;
2761 }
2762
2763
2764 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2765                                    struct timeval timeout, uint32_t destnode,
2766                                    TALLOC_CTX *mem_ctx,
2767                                    uint32_t flags,
2768                                    struct ctdb_all_public_ips **ips)
2769 {
2770         int ret;
2771         TDB_DATA outdata;
2772         int32_t res;
2773
2774         ret = ctdb_control(ctdb, destnode, 0, 
2775                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2776                            mem_ctx, &outdata, &res, &timeout, NULL);
2777         if (ret == 0 && res == -1) {
2778                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2779                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2780         }
2781         if (ret != 0 || res != 0) {
2782           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2783                 return -1;
2784         }
2785
2786         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2787         talloc_free(outdata.dptr);
2788                     
2789         return 0;
2790 }
2791
2792 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2793                              struct timeval timeout, uint32_t destnode,
2794                              TALLOC_CTX *mem_ctx,
2795                              struct ctdb_all_public_ips **ips)
2796 {
2797         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2798                                               destnode, mem_ctx,
2799                                               0, ips);
2800 }
2801
2802 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2803                         struct timeval timeout, uint32_t destnode, 
2804                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2805 {
2806         int ret, i, len;
2807         TDB_DATA outdata;
2808         int32_t res;
2809         struct ctdb_all_public_ipsv4 *ipsv4;
2810
2811         ret = ctdb_control(ctdb, destnode, 0, 
2812                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2813                            mem_ctx, &outdata, &res, &timeout, NULL);
2814         if (ret != 0 || res != 0) {
2815                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2816                 return -1;
2817         }
2818
2819         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2820         len = offsetof(struct ctdb_all_public_ips, ips) +
2821                 ipsv4->num*sizeof(struct ctdb_public_ip);
2822         *ips = talloc_zero_size(mem_ctx, len);
2823         CTDB_NO_MEMORY(ctdb, *ips);
2824         (*ips)->num = ipsv4->num;
2825         for (i=0; i<ipsv4->num; i++) {
2826                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2827                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2828         }
2829
2830         talloc_free(outdata.dptr);
2831                     
2832         return 0;
2833 }
2834
2835 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2836                                  struct timeval timeout, uint32_t destnode,
2837                                  TALLOC_CTX *mem_ctx,
2838                                  const ctdb_sock_addr *addr,
2839                                  struct ctdb_control_public_ip_info **_info)
2840 {
2841         int ret;
2842         TDB_DATA indata;
2843         TDB_DATA outdata;
2844         int32_t res;
2845         struct ctdb_control_public_ip_info *info;
2846         uint32_t len;
2847         uint32_t i;
2848
2849         indata.dptr = discard_const_p(uint8_t, addr);
2850         indata.dsize = sizeof(*addr);
2851
2852         ret = ctdb_control(ctdb, destnode, 0,
2853                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2854                            mem_ctx, &outdata, &res, &timeout, NULL);
2855         if (ret != 0 || res != 0) {
2856                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2857                                 "failed ret:%d res:%d\n",
2858                                 ret, res));
2859                 return -1;
2860         }
2861
2862         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2863         if (len > outdata.dsize) {
2864                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2865                                 "returned invalid data with size %u > %u\n",
2866                                 (unsigned int)outdata.dsize,
2867                                 (unsigned int)len));
2868                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2869                 return -1;
2870         }
2871
2872         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2873         len += info->num*sizeof(struct ctdb_control_iface_info);
2874
2875         if (len > outdata.dsize) {
2876                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2877                                 "returned invalid data with size %u > %u\n",
2878                                 (unsigned int)outdata.dsize,
2879                                 (unsigned int)len));
2880                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2881                 return -1;
2882         }
2883
2884         /* make sure we null terminate the returned strings */
2885         for (i=0; i < info->num; i++) {
2886                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2887         }
2888
2889         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2890                                                                 outdata.dptr,
2891                                                                 outdata.dsize);
2892         talloc_free(outdata.dptr);
2893         if (*_info == NULL) {
2894                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2895                                 "talloc_memdup size %u failed\n",
2896                                 (unsigned int)outdata.dsize));
2897                 return -1;
2898         }
2899
2900         return 0;
2901 }
2902
2903 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2904                          struct timeval timeout, uint32_t destnode,
2905                          TALLOC_CTX *mem_ctx,
2906                          struct ctdb_control_get_ifaces **_ifaces)
2907 {
2908         int ret;
2909         TDB_DATA outdata;
2910         int32_t res;
2911         struct ctdb_control_get_ifaces *ifaces;
2912         uint32_t len;
2913         uint32_t i;
2914
2915         ret = ctdb_control(ctdb, destnode, 0,
2916                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2917                            mem_ctx, &outdata, &res, &timeout, NULL);
2918         if (ret != 0 || res != 0) {
2919                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2920                                 "failed ret:%d res:%d\n",
2921                                 ret, res));
2922                 return -1;
2923         }
2924
2925         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2926         if (len > outdata.dsize) {
2927                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2928                                 "returned invalid data with size %u > %u\n",
2929                                 (unsigned int)outdata.dsize,
2930                                 (unsigned int)len));
2931                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2932                 return -1;
2933         }
2934
2935         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2936         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2937
2938         if (len > outdata.dsize) {
2939                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2940                                 "returned invalid data with size %u > %u\n",
2941                                 (unsigned int)outdata.dsize,
2942                                 (unsigned int)len));
2943                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2944                 return -1;
2945         }
2946
2947         /* make sure we null terminate the returned strings */
2948         for (i=0; i < ifaces->num; i++) {
2949                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2950         }
2951
2952         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2953                                                                   outdata.dptr,
2954                                                                   outdata.dsize);
2955         talloc_free(outdata.dptr);
2956         if (*_ifaces == NULL) {
2957                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2958                                 "talloc_memdup size %u failed\n",
2959                                 (unsigned int)outdata.dsize));
2960                 return -1;
2961         }
2962
2963         return 0;
2964 }
2965
2966 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2967                              struct timeval timeout, uint32_t destnode,
2968                              TALLOC_CTX *mem_ctx,
2969                              const struct ctdb_control_iface_info *info)
2970 {
2971         int ret;
2972         TDB_DATA indata;
2973         int32_t res;
2974
2975         indata.dptr = discard_const_p(uint8_t, info);
2976         indata.dsize = sizeof(*info);
2977
2978         ret = ctdb_control(ctdb, destnode, 0,
2979                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2980                            mem_ctx, NULL, &res, &timeout, NULL);
2981         if (ret != 0 || res != 0) {
2982                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2983                                 "failed ret:%d res:%d\n",
2984                                 ret, res));
2985                 return -1;
2986         }
2987
2988         return 0;
2989 }
2990
2991 /*
2992   set/clear the permanent disabled bit on a remote node
2993  */
2994 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2995                        uint32_t set, uint32_t clear)
2996 {
2997         int ret;
2998         TDB_DATA data;
2999         struct ctdb_node_map *nodemap=NULL;
3000         struct ctdb_node_flag_change c;
3001         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3002         uint32_t recmaster;
3003         uint32_t *nodes;
3004
3005
3006         /* find the recovery master */
3007         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
3008         if (ret != 0) {
3009                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
3010                 talloc_free(tmp_ctx);
3011                 return ret;
3012         }
3013
3014
3015         /* read the node flags from the recmaster */
3016         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
3017         if (ret != 0) {
3018                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
3019                 talloc_free(tmp_ctx);
3020                 return -1;
3021         }
3022         if (destnode >= nodemap->num) {
3023                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
3024                 talloc_free(tmp_ctx);
3025                 return -1;
3026         }
3027
3028         c.pnn       = destnode;
3029         c.old_flags = nodemap->nodes[destnode].flags;
3030         c.new_flags = c.old_flags;
3031         c.new_flags |= set;
3032         c.new_flags &= ~clear;
3033
3034         data.dsize = sizeof(c);
3035         data.dptr = (unsigned char *)&c;
3036
3037         /* send the flags update to all connected nodes */
3038         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
3039
3040         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
3041                                         nodes, 0,
3042                                         timeout, false, data,
3043                                         NULL, NULL,
3044                                         NULL) != 0) {
3045                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
3046
3047                 talloc_free(tmp_ctx);
3048                 return -1;
3049         }
3050
3051         talloc_free(tmp_ctx);
3052         return 0;
3053 }
3054
3055
3056 /*
3057   get all tunables
3058  */
3059 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
3060                                struct timeval timeout, 
3061                                uint32_t destnode,
3062                                struct ctdb_tunable *tunables)
3063 {
3064         TDB_DATA outdata;
3065         int ret;
3066         int32_t res;
3067
3068         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
3069                            &outdata, &res, &timeout, NULL);
3070         if (ret != 0 || res != 0) {
3071                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
3072                 return -1;
3073         }
3074
3075         if (outdata.dsize != sizeof(*tunables)) {
3076                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
3077                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
3078                 return -1;              
3079         }
3080
3081         *tunables = *(struct ctdb_tunable *)outdata.dptr;
3082         talloc_free(outdata.dptr);
3083         return 0;
3084 }
3085
3086 /*
3087   add a public address to a node
3088  */
3089 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
3090                       struct timeval timeout, 
3091                       uint32_t destnode,
3092                       struct ctdb_control_ip_iface *pub)
3093 {
3094         TDB_DATA data;
3095         int32_t res;
3096         int ret;
3097
3098         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3099         data.dptr  = (unsigned char *)pub;
3100
3101         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
3102                            NULL, &res, &timeout, NULL);
3103         if (ret != 0 || res != 0) {
3104                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
3105                 return -1;
3106         }
3107
3108         return 0;
3109 }
3110
3111 /*
3112   delete a public address from a node
3113  */
3114 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
3115                       struct timeval timeout, 
3116                       uint32_t destnode,
3117                       struct ctdb_control_ip_iface *pub)
3118 {
3119         TDB_DATA data;
3120         int32_t res;
3121         int ret;
3122
3123         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3124         data.dptr  = (unsigned char *)pub;
3125
3126         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
3127                            NULL, &res, &timeout, NULL);
3128         if (ret != 0 || res != 0) {
3129                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
3130                 return -1;
3131         }
3132
3133         return 0;
3134 }
3135
3136 /*
3137   kill a tcp connection
3138  */
3139 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
3140                       struct timeval timeout, 
3141                       uint32_t destnode,
3142                       struct ctdb_control_killtcp *killtcp)
3143 {
3144         TDB_DATA data;
3145         int32_t res;
3146         int ret;
3147
3148         data.dsize = sizeof(struct ctdb_control_killtcp);
3149         data.dptr  = (unsigned char *)killtcp;
3150
3151         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
3152                            NULL, &res, &timeout, NULL);
3153         if (ret != 0 || res != 0) {
3154                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
3155                 return -1;
3156         }
3157
3158         return 0;
3159 }
3160
3161 /*
3162   send a gratious arp
3163  */
3164 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3165                       struct timeval timeout, 
3166                       uint32_t destnode,
3167                       ctdb_sock_addr *addr,
3168                       const char *ifname)
3169 {
3170         TDB_DATA data;
3171         int32_t res;
3172         int ret, len;
3173         struct ctdb_control_gratious_arp *gratious_arp;
3174         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3175
3176
3177         len = strlen(ifname)+1;
3178         gratious_arp = talloc_size(tmp_ctx, 
3179                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3180         CTDB_NO_MEMORY(ctdb, gratious_arp);
3181
3182         gratious_arp->addr = *addr;
3183         gratious_arp->len = len;
3184         memcpy(&gratious_arp->iface[0], ifname, len);
3185
3186
3187         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3188         data.dptr  = (unsigned char *)gratious_arp;
3189
3190         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3191                            NULL, &res, &timeout, NULL);
3192         if (ret != 0 || res != 0) {
3193                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3194                 talloc_free(tmp_ctx);
3195                 return -1;
3196         }
3197
3198         talloc_free(tmp_ctx);
3199         return 0;
3200 }
3201
3202 /*
3203   get a list of all tcp tickles that a node knows about for a particular vnn
3204  */
3205 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3206                               struct timeval timeout, uint32_t destnode, 
3207                               TALLOC_CTX *mem_ctx, 
3208                               ctdb_sock_addr *addr,
3209                               struct ctdb_control_tcp_tickle_list **list)
3210 {
3211         int ret;
3212         TDB_DATA data, outdata;
3213         int32_t status;
3214
3215         data.dptr = (uint8_t*)addr;
3216         data.dsize = sizeof(ctdb_sock_addr);
3217
3218         ret = ctdb_control(ctdb, destnode, 0, 
3219                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3220                            mem_ctx, &outdata, &status, NULL, NULL);
3221         if (ret != 0 || status != 0) {
3222                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3223                 return -1;
3224         }
3225
3226         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3227
3228         return status;
3229 }
3230
3231 /*
3232   register a server id
3233  */
3234 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3235                       struct timeval timeout, 
3236                       struct ctdb_server_id *id)
3237 {
3238         TDB_DATA data;
3239         int32_t res;
3240         int ret;
3241
3242         data.dsize = sizeof(struct ctdb_server_id);
3243         data.dptr  = (unsigned char *)id;
3244
3245         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3246                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3247                         0, data, NULL,
3248                         NULL, &res, &timeout, NULL);
3249         if (ret != 0 || res != 0) {
3250                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3251                 return -1;
3252         }
3253
3254         return 0;
3255 }
3256
3257 /*
3258   unregister a server id
3259  */
3260 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3261                       struct timeval timeout, 
3262                       struct ctdb_server_id *id)
3263 {
3264         TDB_DATA data;
3265         int32_t res;
3266         int ret;
3267
3268         data.dsize = sizeof(struct ctdb_server_id);
3269         data.dptr  = (unsigned char *)id;
3270
3271         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3272                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3273                         0, data, NULL,
3274                         NULL, &res, &timeout, NULL);
3275         if (ret != 0 || res != 0) {
3276                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3277                 return -1;
3278         }
3279
3280         return 0;
3281 }
3282
3283
3284 /*
3285   check if a server id exists
3286
3287   if a server id does exist, return *status == 1, otherwise *status == 0
3288  */
3289 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3290                       struct timeval timeout, 
3291                       uint32_t destnode,
3292                       struct ctdb_server_id *id,
3293                       uint32_t *status)
3294 {
3295         TDB_DATA data;
3296         int32_t res;
3297         int ret;
3298
3299         data.dsize = sizeof(struct ctdb_server_id);
3300         data.dptr  = (unsigned char *)id;
3301
3302         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3303                         0, data, NULL,
3304                         NULL, &res, &timeout, NULL);
3305         if (ret != 0) {
3306                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3307                 return -1;
3308         }
3309
3310         if (res) {
3311                 *status = 1;
3312         } else {
3313                 *status = 0;
3314         }
3315
3316         return 0;
3317 }
3318
3319 /*
3320    get the list of server ids that are registered on a node
3321 */
3322 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3323                 TALLOC_CTX *mem_ctx,
3324                 struct timeval timeout, uint32_t destnode, 
3325                 struct ctdb_server_id_list **svid_list)
3326 {
3327         int ret;
3328         TDB_DATA outdata;
3329         int32_t res;
3330
3331         ret = ctdb_control(ctdb, destnode, 0, 
3332                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3333                            mem_ctx, &outdata, &res, &timeout, NULL);
3334         if (ret != 0 || res != 0) {
3335                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3336                 return -1;
3337         }
3338
3339         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3340                     
3341         return 0;
3342 }
3343
3344 /*
3345   initialise the ctdb daemon for client applications
3346
3347   NOTE: In current code the daemon does not fork. This is for testing purposes only
3348   and to simplify the code.
3349 */
3350 struct ctdb_context *ctdb_init(struct event_context *ev)
3351 {
3352         int ret;
3353         struct ctdb_context *ctdb;
3354
3355         ctdb = talloc_zero(ev, struct ctdb_context);
3356         if (ctdb == NULL) {
3357                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3358                 return NULL;
3359         }
3360         ctdb->ev  = ev;
3361         ctdb->idr = idr_init(ctdb);
3362         /* Wrap early to exercise code. */
3363         ctdb->lastid = INT_MAX-200;
3364         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3365
3366         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3367         if (ret != 0) {
3368                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3369                 talloc_free(ctdb);
3370                 return NULL;
3371         }
3372
3373         ctdb->statistics.statistics_start_time = timeval_current();
3374
3375         return ctdb;
3376 }
3377
3378
3379 /*
3380   set some ctdb flags
3381 */
3382 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3383 {
3384         ctdb->flags |= flags;
3385 }
3386
3387 /*
3388   setup the local socket name
3389 */
3390 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3391 {
3392         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3393         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3394
3395         return 0;
3396 }
3397
3398 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3399 {
3400         return ctdb->daemon.name;
3401 }
3402
3403 /*
3404   return the pnn of this node
3405 */
3406 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3407 {
3408         return ctdb->pnn;
3409 }
3410
3411
3412 /*
3413   get the uptime of a remote node
3414  */
3415 struct ctdb_client_control_state *
3416 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3417 {
3418         return ctdb_control_send(ctdb, destnode, 0, 
3419                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3420                            mem_ctx, &timeout, NULL);
3421 }
3422
3423 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3424 {
3425         int ret;
3426         int32_t res;
3427         TDB_DATA outdata;
3428
3429         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3430         if (ret != 0 || res != 0) {
3431                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3432                 return -1;
3433         }
3434
3435         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3436
3437         return 0;
3438 }
3439
3440 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3441 {
3442         struct ctdb_client_control_state *state;
3443
3444         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3445         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3446 }
3447
3448 /*
3449   send a control to execute the "recovered" event script on a node
3450  */
3451 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3452 {
3453         int ret;
3454         int32_t status;
3455
3456         ret = ctdb_control(ctdb, destnode, 0, 
3457                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3458                            NULL, NULL, &status, &timeout, NULL);
3459         if (ret != 0 || status != 0) {
3460                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3461                 return -1;
3462         }
3463
3464         return 0;
3465 }
3466
3467 /* 
3468   callback for the async helpers used when sending the same control
3469   to multiple nodes in parallell.
3470 */
3471 static void async_callback(struct ctdb_client_control_state *state)
3472 {
3473         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3474         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3475         int ret;
3476         TDB_DATA outdata;
3477         int32_t res = -1;
3478         uint32_t destnode = state->c->hdr.destnode;
3479
3480         outdata.dsize = 0;
3481         outdata.dptr = NULL;
3482
3483         /* one more node has responded with recmode data */
3484         data->count--;
3485
3486         /* if we failed to push the db, then return an error and let
3487            the main loop try again.
3488         */
3489         if (state->state != CTDB_CONTROL_DONE) {
3490                 if ( !data->dont_log_errors) {
3491                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3492                 }
3493                 data->fail_count++;
3494                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3495                         res = -ETIME;
3496                 } else {
3497                         res = -1;
3498                 }
3499                 if (data->fail_callback) {
3500                         data->fail_callback(ctdb, destnode, res, outdata,
3501                                         data->callback_data);
3502                 }
3503                 return;
3504         }
3505         
3506         state->async.fn = NULL;
3507
3508         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3509         if ((ret != 0) || (res != 0)) {
3510                 if ( !data->dont_log_errors) {
3511                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3512                 }
3513                 data->fail_count++;
3514                 if (data->fail_callback) {
3515                         data->fail_callback(ctdb, destnode, res, outdata,
3516                                         data->callback_data);
3517                 }
3518         }
3519         if ((ret == 0) && (data->callback != NULL)) {
3520                 data->callback(ctdb, destnode, res, outdata,
3521                                         data->callback_data);
3522         }
3523 }
3524
3525
3526 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3527 {
3528         /* set up the callback functions */
3529         state->async.fn = async_callback;
3530         state->async.private_data = data;
3531         
3532         /* one more control to wait for to complete */
3533         data->count++;
3534 }
3535
3536
3537 /* wait for up to the maximum number of seconds allowed
3538    or until all nodes we expect a response from has replied
3539 */
3540 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3541 {
3542         while (data->count > 0) {
3543                 event_loop_once(ctdb->ev);
3544         }
3545         if (data->fail_count != 0) {
3546                 if (!data->dont_log_errors) {
3547                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3548                                  data->fail_count));
3549                 }
3550                 return -1;
3551         }
3552         return 0;
3553 }
3554
3555
3556 /* 
3557    perform a simple control on the listed nodes
3558    The control cannot return data
3559  */
3560 int ctdb_client_async_control(struct ctdb_context *ctdb,
3561                                 enum ctdb_controls opcode,
3562                                 uint32_t *nodes,
3563                                 uint64_t srvid,
3564                                 struct timeval timeout,
3565                                 bool dont_log_errors,
3566                                 TDB_DATA data,
3567                                 client_async_callback client_callback,
3568                                 client_async_callback fail_callback,
3569                                 void *callback_data)
3570 {
3571         struct client_async_data *async_data;
3572         struct ctdb_client_control_state *state;
3573         int j, num_nodes;
3574
3575         async_data = talloc_zero(ctdb, struct client_async_data);
3576         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3577         async_data->dont_log_errors = dont_log_errors;
3578         async_data->callback = client_callback;
3579         async_data->fail_callback = fail_callback;
3580         async_data->callback_data = callback_data;
3581         async_data->opcode        = opcode;
3582
3583         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3584
3585         /* loop over all nodes and send an async control to each of them */
3586         for (j=0; j<num_nodes; j++) {
3587                 uint32_t pnn = nodes[j];
3588
3589                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3590                                           0, data, async_data, &timeout, NULL);
3591                 if (state == NULL) {
3592                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3593                         talloc_free(async_data);
3594                         return -1;
3595                 }
3596                 
3597                 ctdb_client_async_add(async_data, state);
3598         }
3599
3600         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3601                 talloc_free(async_data);
3602                 return -1;
3603         }
3604
3605         talloc_free(async_data);
3606         return 0;
3607 }
3608
3609 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3610                                 struct ctdb_vnn_map *vnn_map,
3611                                 TALLOC_CTX *mem_ctx,
3612                                 bool include_self)
3613 {
3614         int i, j, num_nodes;
3615         uint32_t *nodes;
3616
3617         for (i=num_nodes=0;i<vnn_map->size;i++) {
3618                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3619                         continue;
3620                 }
3621                 num_nodes++;
3622         } 
3623
3624         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3625         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3626
3627         for (i=j=0;i<vnn_map->size;i++) {
3628                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3629                         continue;
3630                 }
3631                 nodes[j++] = vnn_map->map[i];
3632         } 
3633
3634         return nodes;
3635 }
3636
3637 /* Get list of nodes not including those with flags specified by mask.
3638  * If exclude_pnn is not -1 then exclude that pnn from the list.
3639  */
3640 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3641                         struct ctdb_node_map *node_map,
3642                         TALLOC_CTX *mem_ctx,
3643                         uint32_t mask,
3644                         int exclude_pnn)
3645 {
3646         int i, j, num_nodes;
3647         uint32_t *nodes;
3648
3649         for (i=num_nodes=0;i<node_map->num;i++) {
3650                 if (node_map->nodes[i].flags & mask) {
3651                         continue;
3652                 }
3653                 if (node_map->nodes[i].pnn == exclude_pnn) {
3654                         continue;
3655                 }
3656                 num_nodes++;
3657         } 
3658
3659         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3660         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3661
3662         for (i=j=0;i<node_map->num;i++) {
3663                 if (node_map->nodes[i].flags & mask) {
3664                         continue;
3665                 }
3666                 if (node_map->nodes[i].pnn == exclude_pnn) {
3667                         continue;
3668                 }
3669                 nodes[j++] = node_map->nodes[i].pnn;
3670         } 
3671
3672         return nodes;
3673 }
3674
3675 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3676                                 struct ctdb_node_map *node_map,
3677                                 TALLOC_CTX *mem_ctx,
3678                                 bool include_self)
3679 {
3680         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3681                              include_self ? -1 : ctdb->pnn);
3682 }
3683
3684 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3685                                 struct ctdb_node_map *node_map,
3686                                 TALLOC_CTX *mem_ctx,
3687                                 bool include_self)
3688 {
3689         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3690                              include_self ? -1 : ctdb->pnn);
3691 }
3692
3693 /* 
3694   this is used to test if a pnn lock exists and if it exists will return
3695   the number of connections that pnn has reported or -1 if that recovery
3696   daemon is not running.
3697 */
3698 int
3699 ctdb_read_pnn_lock(int fd, int32_t pnn)
3700 {
3701         struct flock lock;
3702         char c;
3703
3704         lock.l_type = F_WRLCK;
3705         lock.l_whence = SEEK_SET;
3706         lock.l_start = pnn;
3707         lock.l_len = 1;
3708         lock.l_pid = 0;
3709
3710         if (fcntl(fd, F_GETLK, &lock) != 0) {
3711                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3712                 return -1;
3713         }
3714
3715         if (lock.l_type == F_UNLCK) {
3716                 return -1;
3717         }
3718
3719         if (pread(fd, &c, 1, pnn) == -1) {
3720                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3721                 return -1;
3722         }
3723
3724         return c;
3725 }
3726
3727 /*
3728   get capabilities of a remote node
3729  */
3730 struct ctdb_client_control_state *
3731 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3732 {
3733         return ctdb_control_send(ctdb, destnode, 0, 
3734                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3735                            mem_ctx, &timeout, NULL);
3736 }
3737
3738 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3739 {
3740         int ret;
3741         int32_t res;
3742         TDB_DATA outdata;
3743
3744         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3745         if ( (ret != 0) || (res != 0) ) {
3746                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3747                 return -1;
3748         }
3749
3750         if (capabilities) {
3751                 *capabilities = *((uint32_t *)outdata.dptr);
3752         }
3753
3754         return 0;
3755 }
3756
3757 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3758 {
3759         struct ctdb_client_control_state *state;
3760         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3761         int ret;
3762
3763         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3764         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3765         talloc_free(tmp_ctx);
3766         return ret;
3767 }
3768
3769 struct server_id {
3770         uint64_t pid;
3771         uint32_t task_id;
3772         uint32_t vnn;
3773         uint64_t unique_id;
3774 };
3775
3776 static struct server_id server_id_get(struct ctdb_context *ctdb, uint32_t reqid)
3777 {
3778         struct server_id id;
3779
3780         id.pid = getpid();
3781         id.task_id = reqid;
3782         id.vnn = ctdb_get_pnn(ctdb);
3783         id.unique_id = id.vnn;
3784         id.unique_id = (id.unique_id << 32) | reqid;
3785
3786         return id;
3787 }
3788
3789 static bool server_id_equal(struct server_id *id1, struct server_id *id2)
3790 {
3791         if (id1->pid != id2->pid) {
3792                 return false;
3793         }
3794
3795         if (id1->task_id != id2->task_id) {
3796                 return false;
3797         }
3798
3799         if (id1->vnn != id2->vnn) {
3800                 return false;
3801         }
3802
3803         if (id1->unique_id != id2->unique_id) {
3804                 return false;
3805         }
3806
3807         return true;
3808 }
3809
3810 static bool server_id_exists(struct ctdb_context *ctdb, struct server_id *id)
3811 {
3812         struct ctdb_server_id sid;
3813         int ret;
3814         uint32_t result;
3815
3816         sid.type = SERVER_TYPE_SAMBA;
3817         sid.pnn = id->vnn;
3818         sid.server_id = id->pid;
3819
3820         ret = ctdb_ctrl_check_server_id(ctdb, timeval_current_ofs(3,0),
3821                                         id->vnn, &sid, &result);
3822         if (ret != 0) {
3823                 /* If control times out, assume server_id exists. */
3824                 return true;
3825         }
3826
3827         if (result) {
3828                 return true;
3829         }
3830
3831         return false;
3832 }
3833
3834
3835 enum g_lock_type {
3836         G_LOCK_READ = 0,
3837         G_LOCK_WRITE = 1,
3838 };
3839
3840 struct g_lock_rec {
3841         enum g_lock_type type;
3842         struct server_id id;
3843 };
3844
3845 struct g_lock_recs {
3846         unsigned int num;
3847         struct g_lock_rec *lock;
3848 };
3849
3850 static bool g_lock_parse(TALLOC_CTX *mem_ctx, TDB_DATA data,
3851                          struct g_lock_recs **locks)
3852 {
3853         struct g_lock_recs *recs;
3854
3855         recs = talloc_zero(mem_ctx, struct g_lock_recs);
3856         if (recs == NULL) {
3857                 return false;
3858         }
3859
3860         if (data.dsize == 0) {
3861                 goto done;
3862         }
3863
3864         if (data.dsize % sizeof(struct g_lock_rec) != 0) {
3865                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %lu in g_lock record\n",
3866                                   (unsigned long)data.dsize));
3867                 talloc_free(recs);
3868                 return false;
3869         }
3870
3871         recs->num = data.dsize / sizeof(struct g_lock_rec);
3872         recs->lock = talloc_memdup(mem_ctx, data.dptr, data.dsize);
3873         if (recs->lock == NULL) {
3874                 talloc_free(recs);
3875                 return false;
3876         }
3877
3878 done:
3879         if (locks != NULL) {
3880                 *locks = recs;
3881         }
3882
3883         return true;
3884 }
3885
3886
3887 static bool g_lock_lock(TALLOC_CTX *mem_ctx,
3888                         struct ctdb_db_context *ctdb_db,
3889                         const char *keyname, uint32_t reqid)
3890 {
3891         TDB_DATA key, data;
3892         struct ctdb_record_handle *h;
3893         struct g_lock_recs *locks;
3894         struct server_id id;
3895         struct timeval t_start;
3896         int i;
3897
3898         key.dptr = (uint8_t *)discard_const(keyname);
3899         key.dsize = strlen(keyname) + 1;
3900
3901         t_start = timeval_current();
3902
3903 again:
3904         /* Keep trying for an hour. */
3905         if (timeval_elapsed(&t_start) > 3600) {
3906                 return false;
3907         }
3908
3909         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3910         if (h == NULL) {
3911                 return false;
3912         }
3913
3914         if (!g_lock_parse(h, data, &locks)) {
3915                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3916                 talloc_free(data.dptr);
3917                 talloc_free(h);
3918                 return false;
3919         }
3920
3921         talloc_free(data.dptr);
3922
3923         id = server_id_get(ctdb_db->ctdb, reqid);
3924
3925         i = 0;
3926         while (i < locks->num) {
3927                 if (server_id_equal(&locks->lock[i].id, &id)) {
3928                         /* Internal error */
3929                         talloc_free(h);
3930                         return false;
3931                 }
3932
3933                 if (!server_id_exists(ctdb_db->ctdb, &locks->lock[i].id)) {
3934                         if (i < locks->num-1) {
3935                                 locks->lock[i] = locks->lock[locks->num-1];
3936                         }
3937                         locks->num--;
3938                         continue;
3939                 }
3940
3941                 /* This entry is locked. */
3942                 DEBUG(DEBUG_INFO, ("g_lock: lock already granted for "
3943                                    "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3944                                    (unsigned long long)id.pid,
3945                                    id.task_id, id.vnn,
3946                                    (unsigned long long)id.unique_id));
3947                 talloc_free(h);
3948                 goto again;
3949         }
3950
3951         locks->lock = talloc_realloc(locks, locks->lock, struct g_lock_rec,
3952                                      locks->num+1);
3953         if (locks->lock == NULL) {
3954                 talloc_free(h);
3955                 return false;
3956         }
3957
3958         locks->lock[locks->num].type = G_LOCK_WRITE;
3959         locks->lock[locks->num].id = id;
3960         locks->num++;
3961
3962         data.dptr = (uint8_t *)locks->lock;
3963         data.dsize = locks->num * sizeof(struct g_lock_rec);
3964
3965         if (ctdb_record_store(h, data) != 0) {
3966                 DEBUG(DEBUG_ERR, ("g_lock: failed to write transaction lock for "
3967                                   "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3968                                   (unsigned long long)id.pid,
3969                                   id.task_id, id.vnn,
3970                                   (unsigned long long)id.unique_id));
3971                 talloc_free(h);
3972                 return false;
3973         }
3974
3975         DEBUG(DEBUG_INFO, ("g_lock: lock granted for "
3976                            "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3977                            (unsigned long long)id.pid,
3978                            id.task_id, id.vnn,
3979                            (unsigned long long)id.unique_id));
3980
3981         talloc_free(h);
3982         return true;
3983 }
3984
3985 static bool g_lock_unlock(TALLOC_CTX *mem_ctx,
3986                           struct ctdb_db_context *ctdb_db,
3987                           const char *keyname, uint32_t reqid)
3988 {
3989         TDB_DATA key, data;
3990         struct ctdb_record_handle *h;
3991         struct g_lock_recs *locks;
3992         struct server_id id;
3993         int i;
3994         bool found = false;
3995
3996         key.dptr = (uint8_t *)discard_const(keyname);
3997         key.dsize = strlen(keyname) + 1;
3998         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3999         if (h == NULL) {
4000                 return false;
4001         }
4002
4003         if (!g_lock_parse(h, data, &locks)) {
4004                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
4005                 talloc_free(data.dptr);
4006                 talloc_free(h);
4007                 return false;
4008         }
4009
4010         talloc_free(data.dptr);
4011
4012         id = server_id_get(ctdb_db->ctdb, reqid);
4013
4014         for (i=0; i<locks->num; i++) {
4015                 if (server_id_equal(&locks->lock[i].id, &id)) {
4016                         if (i < locks->num-1) {
4017                                 locks->lock[i] = locks->lock[locks->num-1];
4018                         }
4019                         locks->num--;
4020                         found = true;
4021                         break;
4022                 }
4023         }
4024
4025         if (!found) {
4026                 DEBUG(DEBUG_ERR, ("g_lock: lock not found\n"));
4027                 talloc_free(h);
4028                 return false;
4029         }
4030
4031         data.dptr = (uint8_t *)locks->lock;
4032         data.dsize = locks->num * sizeof(struct g_lock_rec);
4033
4034         if (ctdb_record_store(h, data) != 0) {
4035                 talloc_free(h);
4036                 return false;
4037         }
4038
4039         talloc_free(h);
4040         return true;
4041 }
4042
4043
4044 struct ctdb_transaction_handle {
4045         struct ctdb_db_context *ctdb_db;
4046         struct ctdb_db_context *g_lock_db;
4047         char *lock_name;
4048         uint32_t reqid;
4049         /*
4050          * we store reads and writes done under a transaction:
4051          * - one list stores both reads and writes (m_all)
4052          * - the other just writes (m_write)
4053          */
4054         struct ctdb_marshall_buffer *m_all;
4055         struct ctdb_marshall_buffer *m_write;
4056 };
4057
4058 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
4059 {
4060         g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid);
4061         ctdb_reqid_remove(h->ctdb_db->ctdb, h->reqid);
4062         return 0;
4063 }
4064
4065
4066 /**
4067  * start a transaction on a database
4068  */
4069 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
4070                                                        TALLOC_CTX *mem_ctx)
4071 {
4072         struct ctdb_transaction_handle *h;
4073         struct ctdb_server_id id;
4074
4075         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
4076         if (h == NULL) {
4077                 DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n"));
4078                 return NULL;
4079         }
4080
4081         h->ctdb_db = ctdb_db;
4082         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
4083                                        (unsigned int)ctdb_db->db_id);
4084         if (h->lock_name == NULL) {
4085                 DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n"));
4086                 talloc_free(h);
4087                 return NULL;
4088         }
4089
4090         h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0),
4091                                    "g_lock.tdb", false, 0);
4092         if (!h->g_lock_db) {
4093                 DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n"));
4094                 talloc_free(h);
4095                 return NULL;
4096         }
4097
4098         id.type = SERVER_TYPE_SAMBA;
4099         id.pnn = ctdb_get_pnn(ctdb_db->ctdb);
4100         id.server_id = getpid();
4101
4102         if (ctdb_ctrl_register_server_id(ctdb_db->ctdb, timeval_current_ofs(3,0),
4103                                          &id) != 0) {
4104                 DEBUG(DEBUG_ERR, (__location__ " unable to register server id\n"));
4105                 talloc_free(h);
4106                 return NULL;
4107         }
4108
4109         h->reqid = ctdb_reqid_new(h->ctdb_db->ctdb, h);
4110
4111         if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) {
4112                 DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n"));
4113                 talloc_free(h);
4114                 return NULL;
4115         }
4116
4117         talloc_set_destructor(h, ctdb_transaction_destructor);
4118         return h;
4119 }
4120
4121 /**
4122  * fetch a record inside a transaction
4123  */
4124 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
4125                            TALLOC_CTX *mem_ctx,
4126                            TDB_DATA key, TDB_DATA *data)
4127 {
4128         struct ctdb_ltdb_header header;
4129         int ret;
4130
4131         ZERO_STRUCT(header);
4132
4133         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
4134         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4135                 /* record doesn't exist yet */
4136                 *data = tdb_null;
4137                 ret = 0;
4138         }
4139
4140         if (ret != 0) {
4141                 return ret;
4142         }
4143
4144         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
4145         if (h->m_all == NULL) {
4146                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4147                 return -1;
4148         }
4149
4150         return 0;
4151 }
4152
4153 /**
4154  * stores a record inside a transaction
4155  */
4156 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
4157                            TDB_DATA key, TDB_DATA data)
4158 {
4159         TALLOC_CTX *tmp_ctx = talloc_new(h);
4160         struct ctdb_ltdb_header header;
4161         TDB_DATA olddata;
4162         int ret;
4163
4164         /* we need the header so we can update the RSN */
4165         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
4166         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4167                 /* the record doesn't exist - create one with us as dmaster.
4168                    This is only safe because we are in a transaction and this
4169                    is a persistent database */
4170                 ZERO_STRUCT(header);
4171         } else if (ret != 0) {
4172                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
4173                 talloc_free(tmp_ctx);
4174                 return ret;
4175         }
4176
4177         if (data.dsize == olddata.dsize &&
4178             memcmp(data.dptr, olddata.dptr, data.dsize) == 0 &&
4179             header.rsn != 0) {
4180                 /* save writing the same data */
4181                 talloc_free(tmp_ctx);
4182                 return 0;
4183         }
4184
4185         header.dmaster = h->ctdb_db->ctdb->pnn;
4186         header.rsn++;
4187
4188         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
4189         if (h->m_all == NULL) {
4190                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4191                 talloc_free(tmp_ctx);
4192                 return -1;
4193         }
4194
4195         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
4196         if (h->m_write == NULL) {
4197                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4198                 talloc_free(tmp_ctx);
4199                 return -1;
4200         }
4201
4202         talloc_free(tmp_ctx);
4203         return 0;
4204 }
4205
4206 static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum)
4207 {
4208         const char *keyname = CTDB_DB_SEQNUM_KEY;
4209         TDB_DATA key, data;
4210         struct ctdb_ltdb_header header;
4211         int ret;
4212
4213         key.dptr = (uint8_t *)discard_const(keyname);
4214         key.dsize = strlen(keyname) + 1;
4215
4216         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
4217         if (ret != 0) {
4218                 *seqnum = 0;
4219                 return 0;
4220         }
4221
4222         if (data.dsize == 0) {
4223                 *seqnum = 0;
4224                 return 0;
4225         }
4226
4227         if (data.dsize != sizeof(*seqnum)) {
4228                 DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n",
4229                                   data.dsize));
4230                 talloc_free(data.dptr);
4231                 return -1;
4232         }
4233
4234         *seqnum = *(uint64_t *)data.dptr;
4235         talloc_free(data.dptr);
4236
4237         return 0;
4238 }
4239
4240
4241 static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h,
4242                                 uint64_t seqnum)
4243 {
4244         const char *keyname = CTDB_DB_SEQNUM_KEY;
4245         TDB_DATA key, data;
4246
4247         key.dptr = (uint8_t *)discard_const(keyname);
4248         key.dsize = strlen(keyname) + 1;
4249
4250         data.dptr = (uint8_t *)&seqnum;
4251         data.dsize = sizeof(seqnum);
4252
4253         return ctdb_transaction_store(h, key, data);
4254 }
4255
4256
4257 /**
4258  * commit a transaction
4259  */
4260 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
4261 {
4262         int ret;
4263         uint64_t old_seqnum, new_seqnum;
4264         int32_t status;
4265         struct timeval timeout;
4266
4267         if (h->m_write == NULL) {
4268                 /* no changes were made */
4269                 talloc_free(h);
4270                 return 0;
4271         }
4272
4273         ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum);
4274         if (ret != 0) {
4275                 DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4276                 ret = -1;
4277                 goto done;
4278         }
4279
4280         new_seqnum = old_seqnum + 1;
4281         ret = ctdb_store_db_seqnum(h, new_seqnum);
4282         if (ret != 0) {
4283                 DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n"));
4284                 ret = -1;
4285                 goto done;
4286         }
4287
4288 again:
4289         timeout = timeval_current_ofs(3,0);
4290         ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE,
4291                            h->ctdb_db->db_id,
4292                            CTDB_CONTROL_TRANS3_COMMIT, 0,
4293                            ctdb_marshall_finish(h->m_write), NULL, NULL,
4294                            &status, &timeout, NULL);
4295         if (ret != 0 || status != 0) {
4296                 /*
4297                  * TRANS3_COMMIT control will only fail if recovery has been
4298                  * triggered.  Check if the database has been updated or not.
4299                  */
4300                 ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum);
4301                 if (ret != 0) {
4302                         DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4303                         goto done;
4304                 }
4305
4306                 if (new_seqnum == old_seqnum) {
4307                         /* Database not yet updated, try again */
4308                         goto again;
4309                 }
4310
4311                 if (new_seqnum != (old_seqnum + 1)) {
4312                         DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n",
4313                                           (long long unsigned)new_seqnum,
4314                                           (long long unsigned)old_seqnum));
4315                         ret = -1;
4316                         goto done;
4317                 }
4318         }
4319
4320         ret = 0;
4321
4322 done:
4323         talloc_free(h);
4324         return ret;
4325 }
4326
4327 /**
4328  * cancel a transaction
4329  */
4330 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
4331 {
4332         talloc_free(h);
4333         return 0;
4334 }
4335
4336
4337 /*
4338   recovery daemon ping to main daemon
4339  */
4340 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4341 {
4342         int ret;
4343         int32_t res;
4344
4345         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4346                            ctdb, NULL, &res, NULL, NULL);
4347         if (ret != 0 || res != 0) {
4348                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4349                 return -1;
4350         }
4351
4352         return 0;
4353 }
4354
4355 /* When forking the main daemon and the child process needs to connect
4356  * back to the daemon as a client process, this function can be used
4357  * to change the ctdb context from daemon into client mode.  The child
4358  * process must be created using ctdb_fork() and not fork() -
4359  * ctdb_fork() does some necessary housekeeping.
4360  */
4361 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4362 {
4363         int ret;
4364         va_list ap;
4365
4366         /* Add extra information so we can identify this in the logs */
4367         va_start(ap, fmt);
4368         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4369         va_end(ap);
4370
4371         /* get a new event context */
4372         ctdb->ev = event_context_init(ctdb);
4373         tevent_loop_allow_nesting(ctdb->ev);
4374
4375         /* Connect to main CTDB daemon */
4376         ret = ctdb_socket_connect(ctdb);
4377         if (ret != 0) {
4378                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4379                 return -1;
4380         }
4381
4382         ctdb->can_send_controls = true;
4383
4384         return 0;
4385 }
4386
4387 /*
4388   get the status of running the monitor eventscripts: NULL means never run.
4389  */
4390 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4391                 struct timeval timeout, uint32_t destnode, 
4392                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4393                 struct ctdb_scripts_wire **scripts)
4394 {
4395         int ret;
4396         TDB_DATA outdata, indata;
4397         int32_t res;
4398         uint32_t uinttype = type;
4399
4400         indata.dptr = (uint8_t *)&uinttype;
4401         indata.dsize = sizeof(uinttype);
4402
4403         ret = ctdb_control(ctdb, destnode, 0, 
4404                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4405                            mem_ctx, &outdata, &res, &timeout, NULL);
4406         if (ret != 0 || res != 0) {
4407                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4408                 return -1;
4409         }
4410
4411         if (outdata.dsize == 0) {
4412                 *scripts = NULL;
4413         } else {
4414                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4415                 talloc_free(outdata.dptr);
4416         }
4417                     
4418         return 0;
4419 }
4420
4421 /*
4422   tell the main daemon how long it took to lock the reclock file
4423  */
4424 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4425 {
4426         int ret;
4427         int32_t res;
4428         TDB_DATA data;
4429
4430         data.dptr = (uint8_t *)&latency;
4431         data.dsize = sizeof(latency);
4432
4433         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4434                            ctdb, NULL, &res, NULL, NULL);
4435         if (ret != 0 || res != 0) {
4436                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4437                 return -1;
4438         }
4439
4440         return 0;
4441 }
4442
4443 /*
4444   get the name of the reclock file
4445  */
4446 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4447                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4448                          const char **name)
4449 {
4450         int ret;
4451         int32_t res;
4452         TDB_DATA data;
4453
4454         ret = ctdb_control(ctdb, destnode, 0, 
4455                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4456                            mem_ctx, &data, &res, &timeout, NULL);
4457         if (ret != 0 || res != 0) {
4458                 return -1;
4459         }
4460
4461         if (data.dsize == 0) {
4462                 *name = NULL;
4463         } else {
4464                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4465         }
4466         talloc_free(data.dptr);
4467
4468         return 0;
4469 }
4470
4471 /*
4472   set the reclock filename for a node
4473  */
4474 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4475 {
4476         int ret;
4477         TDB_DATA data;
4478         int32_t res;
4479
4480         if (reclock == NULL) {
4481                 data.dsize = 0;
4482                 data.dptr  = NULL;
4483         } else {
4484                 data.dsize = strlen(reclock) + 1;
4485                 data.dptr  = discard_const(reclock);
4486         }
4487
4488         ret = ctdb_control(ctdb, destnode, 0, 
4489                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4490                            NULL, NULL, &res, &timeout, NULL);
4491         if (ret != 0 || res != 0) {
4492                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4493                 return -1;
4494         }
4495
4496         return 0;
4497 }
4498
4499 /*
4500   stop a node
4501  */
4502 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4503 {
4504         int ret;
4505         int32_t res;
4506
4507         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4508                            ctdb, NULL, &res, &timeout, NULL);
4509         if (ret != 0 || res != 0) {
4510                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4511                 return -1;
4512         }
4513
4514         return 0;
4515 }
4516
4517 /*
4518   continue a node
4519  */
4520 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4521 {
4522         int ret;
4523
4524         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4525                            ctdb, NULL, NULL, &timeout, NULL);
4526         if (ret != 0) {
4527                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4528                 return -1;
4529         }
4530
4531         return 0;
4532 }
4533
4534 /*
4535   set the natgw state for a node
4536  */
4537 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4538 {
4539         int ret;
4540         TDB_DATA data;
4541         int32_t res;
4542
4543         data.dsize = sizeof(natgwstate);
4544         data.dptr  = (uint8_t *)&natgwstate;
4545
4546         ret = ctdb_control(ctdb, destnode, 0, 
4547                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4548                            NULL, NULL, &res, &timeout, NULL);
4549         if (ret != 0 || res != 0) {
4550                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4551                 return -1;
4552         }
4553
4554         return 0;
4555 }
4556
4557 /*
4558   set the lmaster role for a node
4559  */
4560 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4561 {
4562         int ret;
4563         TDB_DATA data;
4564         int32_t res;
4565
4566         data.dsize = sizeof(lmasterrole);
4567         data.dptr  = (uint8_t *)&lmasterrole;
4568
4569         ret = ctdb_control(ctdb, destnode, 0, 
4570                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4571                            NULL, NULL, &res, &timeout, NULL);
4572         if (ret != 0 || res != 0) {
4573                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4574                 return -1;
4575         }
4576
4577         return 0;
4578 }
4579
4580 /*
4581   set the recmaster role for a node
4582  */
4583 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4584 {
4585         int ret;
4586         TDB_DATA data;
4587         int32_t res;
4588
4589         data.dsize = sizeof(recmasterrole);
4590         data.dptr  = (uint8_t *)&recmasterrole;
4591
4592         ret = ctdb_control(ctdb, destnode, 0, 
4593                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4594                            NULL, NULL, &res, &timeout, NULL);
4595         if (ret != 0 || res != 0) {
4596                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4597                 return -1;
4598         }
4599
4600         return 0;
4601 }
4602
4603 /* enable an eventscript
4604  */
4605 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4606 {
4607         int ret;
4608         TDB_DATA data;
4609         int32_t res;
4610
4611         data.dsize = strlen(script) + 1;
4612         data.dptr  = discard_const(script);
4613
4614         ret = ctdb_control(ctdb, destnode, 0, 
4615                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4616                            NULL, NULL, &res, &timeout, NULL);
4617         if (ret != 0 || res != 0) {
4618                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4619                 return -1;
4620         }
4621
4622         return 0;
4623 }
4624
4625 /* disable an eventscript
4626  */
4627 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4628 {
4629         int ret;
4630         TDB_DATA data;
4631         int32_t res;
4632
4633         data.dsize = strlen(script) + 1;
4634         data.dptr  = discard_const(script);
4635
4636         ret = ctdb_control(ctdb, destnode, 0, 
4637                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4638                            NULL, NULL, &res, &timeout, NULL);
4639         if (ret != 0 || res != 0) {
4640                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4641                 return -1;
4642         }
4643
4644         return 0;
4645 }
4646
4647
4648 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4649 {
4650         int ret;
4651         TDB_DATA data;
4652         int32_t res;
4653
4654         data.dsize = sizeof(*bantime);
4655         data.dptr  = (uint8_t *)bantime;
4656
4657         ret = ctdb_control(ctdb, destnode, 0, 
4658                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4659                            NULL, NULL, &res, &timeout, NULL);
4660         if (ret != 0 || res != 0) {
4661                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4662                 return -1;
4663         }
4664
4665         return 0;
4666 }
4667
4668
4669 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4670 {
4671         int ret;
4672         TDB_DATA outdata;
4673         int32_t res;
4674         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4675
4676         ret = ctdb_control(ctdb, destnode, 0, 
4677                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4678                            tmp_ctx, &outdata, &res, &timeout, NULL);
4679         if (ret != 0 || res != 0) {
4680                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4681                 talloc_free(tmp_ctx);
4682                 return -1;
4683         }
4684
4685         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4686         talloc_free(tmp_ctx);
4687
4688         return 0;
4689 }
4690
4691
4692 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4693 {
4694         int ret;
4695         int32_t res;
4696         TDB_DATA data;
4697         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4698
4699         data.dptr = (uint8_t*)db_prio;
4700         data.dsize = sizeof(*db_prio);
4701
4702         ret = ctdb_control(ctdb, destnode, 0, 
4703                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4704                            tmp_ctx, NULL, &res, &timeout, NULL);
4705         if (ret != 0 || res != 0) {
4706                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4707                 talloc_free(tmp_ctx);
4708                 return -1;
4709         }
4710
4711         talloc_free(tmp_ctx);
4712
4713         return 0;
4714 }
4715
4716 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4717 {
4718         int ret;
4719         int32_t res;
4720         TDB_DATA data;
4721         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4722
4723         data.dptr = (uint8_t*)&db_id;
4724         data.dsize = sizeof(db_id);
4725
4726         ret = ctdb_control(ctdb, destnode, 0, 
4727                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4728                            tmp_ctx, NULL, &res, &timeout, NULL);
4729         if (ret != 0 || res < 0) {
4730                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4731                 talloc_free(tmp_ctx);
4732                 return -1;
4733         }
4734
4735         if (priority) {
4736                 *priority = res;
4737         }
4738
4739         talloc_free(tmp_ctx);
4740
4741         return 0;
4742 }
4743
4744 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4745 {
4746         int ret;
4747         TDB_DATA outdata;
4748         int32_t res;
4749
4750         ret = ctdb_control(ctdb, destnode, 0, 
4751                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4752                            mem_ctx, &outdata, &res, &timeout, NULL);
4753         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4754                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4755                 return -1;
4756         }
4757
4758         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4759         talloc_free(outdata.dptr);
4760                     
4761         return 0;
4762 }
4763
4764 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4765 {
4766         if (h == NULL) {
4767                 return NULL;
4768         }
4769
4770         return &h->header;
4771 }
4772
4773
4774 struct ctdb_client_control_state *
4775 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4776 {
4777         struct ctdb_client_control_state *handle;
4778         struct ctdb_marshall_buffer *m;
4779         struct ctdb_rec_data *rec;
4780         TDB_DATA outdata;
4781
4782         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4783         if (m == NULL) {
4784                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4785                 return NULL;
4786         }
4787
4788         m->db_id = ctdb_db->db_id;
4789
4790         rec = ctdb_marshall_record(m, 0, key, header, data);
4791         if (rec == NULL) {
4792                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4793                 talloc_free(m);
4794                 return NULL;
4795         }
4796         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4797         if (m == NULL) {
4798                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4799                 talloc_free(m);
4800                 return NULL;
4801         }
4802         m->count++;
4803         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4804
4805
4806         outdata.dptr = (uint8_t *)m;
4807         outdata.dsize = talloc_get_size(m);
4808
4809         handle = ctdb_control_send(ctdb, destnode, 0, 
4810                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4811                            mem_ctx, &timeout, NULL);
4812         talloc_free(m);
4813         return handle;
4814 }
4815
4816 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4817 {
4818         int ret;
4819         int32_t res;
4820
4821         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4822         if ( (ret != 0) || (res != 0) ){
4823                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4824                 return -1;
4825         }
4826
4827         return 0;
4828 }
4829
4830 int
4831 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4832 {
4833         struct ctdb_client_control_state *state;
4834
4835         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4836         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4837 }
4838
4839
4840
4841
4842
4843
4844 /*
4845   set a database to be readonly
4846  */
4847 struct ctdb_client_control_state *
4848 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4849 {
4850         TDB_DATA data;
4851
4852         data.dptr = (uint8_t *)&dbid;
4853         data.dsize = sizeof(dbid);
4854
4855         return ctdb_control_send(ctdb, destnode, 0, 
4856                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4857                            ctdb, NULL, NULL);
4858 }
4859
4860 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4861 {
4862         int ret;
4863         int32_t res;
4864
4865         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4866         if (ret != 0 || res != 0) {
4867           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4868                 return -1;
4869         }
4870
4871         return 0;
4872 }
4873
4874 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4875 {
4876         struct ctdb_client_control_state *state;
4877
4878         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4879         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4880 }
4881
4882 /*
4883   set a database to be sticky
4884  */
4885 struct ctdb_client_control_state *
4886 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4887 {
4888         TDB_DATA data;
4889
4890         data.dptr = (uint8_t *)&dbid;
4891         data.dsize = sizeof(dbid);
4892
4893         return ctdb_control_send(ctdb, destnode, 0, 
4894                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4895                            ctdb, NULL, NULL);
4896 }
4897
4898 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4899 {
4900         int ret;
4901         int32_t res;
4902
4903         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4904         if (ret != 0 || res != 0) {
4905                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4906                 return -1;
4907         }
4908
4909         return 0;
4910 }
4911
4912 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4913 {
4914         struct ctdb_client_control_state *state;
4915
4916         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4917         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4918 }