ctdb-daemon: Rename struct ctdb_control_iface_info to ctdb_iface
[vlendec/samba-autobuild/.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "replace.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/locale.h"
25
26 #include <talloc.h>
27 /* Allow use of deprecated function tevent_loop_allow_nesting() */
28 #define TEVENT_DEPRECATED
29 #include <tevent.h>
30 #include <tdb.h>
31
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34 #include "lib/util/time.h"
35 #include "lib/util/debug.h"
36 #include "lib/util/samba_util.h"
37
38 #include "ctdb_logging.h"
39 #include "ctdb_private.h"
40 #include "ctdb_client.h"
41
42 #include "common/reqid.h"
43 #include "common/system.h"
44 #include "common/common.h"
45
46 /*
47   allocate a packet for use in client<->daemon communication
48  */
49 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
50                                             TALLOC_CTX *mem_ctx, 
51                                             enum ctdb_operation operation, 
52                                             size_t length, size_t slength,
53                                             const char *type)
54 {
55         int size;
56         struct ctdb_req_header *hdr;
57
58         length = MAX(length, slength);
59         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
60
61         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
62         if (hdr == NULL) {
63                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
64                          operation, (unsigned)length));
65                 return NULL;
66         }
67         talloc_set_name_const(hdr, type);
68         hdr->length       = length;
69         hdr->operation    = operation;
70         hdr->ctdb_magic   = CTDB_MAGIC;
71         hdr->ctdb_version = CTDB_PROTOCOL;
72         hdr->srcnode      = ctdb->pnn;
73         if (ctdb->vnn_map) {
74                 hdr->generation = ctdb->vnn_map->generation;
75         }
76
77         return hdr;
78 }
79
80 /*
81   local version of ctdb_call
82 */
83 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
84                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
85                     TDB_DATA *data, bool updatetdb)
86 {
87         struct ctdb_call_info *c;
88         struct ctdb_registered_call *fn;
89         struct ctdb_context *ctdb = ctdb_db->ctdb;
90         
91         c = talloc(ctdb, struct ctdb_call_info);
92         CTDB_NO_MEMORY(ctdb, c);
93
94         c->key = call->key;
95         c->call_data = &call->call_data;
96         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
97         c->record_data.dsize = data->dsize;
98         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
99         c->new_data = NULL;
100         c->reply_data = NULL;
101         c->status = 0;
102         c->header = header;
103
104         for (fn=ctdb_db->calls;fn;fn=fn->next) {
105                 if (fn->id == call->call_id) break;
106         }
107         if (fn == NULL) {
108                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
109                 talloc_free(c);
110                 return -1;
111         }
112
113         if (fn->fn(c) != 0) {
114                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
115                 talloc_free(c);
116                 return -1;
117         }
118
119         /* we need to force the record to be written out if this was a remote access */
120         if (c->new_data == NULL) {
121                 c->new_data = &c->record_data;
122         }
123
124         if (c->new_data && updatetdb) {
125                 /* XXX check that we always have the lock here? */
126                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
127                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
128                         talloc_free(c);
129                         return -1;
130                 }
131         }
132
133         if (c->reply_data) {
134                 call->reply_data = *c->reply_data;
135
136                 talloc_steal(call, call->reply_data.dptr);
137                 talloc_set_name_const(call->reply_data.dptr, __location__);
138         } else {
139                 call->reply_data.dptr = NULL;
140                 call->reply_data.dsize = 0;
141         }
142         call->status = c->status;
143
144         talloc_free(c);
145
146         return 0;
147 }
148
149
150 /*
151   queue a packet for sending from client to daemon
152 */
153 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
154 {
155         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
156 }
157
158
159 /*
160   called when a CTDB_REPLY_CALL packet comes in in the client
161
162   This packet comes in response to a CTDB_REQ_CALL request packet. It
163   contains any reply data from the call
164 */
165 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
166 {
167         struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
168         struct ctdb_client_call_state *state;
169
170         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_client_call_state);
171         if (state == NULL) {
172                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
173                 return;
174         }
175
176         if (hdr->reqid != state->reqid) {
177                 /* we found a record  but it was the wrong one */
178                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
179                 return;
180         }
181
182         state->call->reply_data.dptr = c->data;
183         state->call->reply_data.dsize = c->datalen;
184         state->call->status = c->status;
185
186         talloc_steal(state, c);
187
188         state->state = CTDB_CALL_DONE;
189
190         if (state->async.fn) {
191                 state->async.fn(state);
192         }
193 }
194
195 void ctdb_request_message(struct ctdb_context *ctdb,
196                           struct ctdb_req_header *hdr)
197 {
198         struct ctdb_req_message_old *c = (struct ctdb_req_message_old *)hdr;
199         TDB_DATA data;
200
201         data.dsize = c->datalen;
202         data.dptr = talloc_memdup(c, &c->data[0], c->datalen);
203         if (data.dptr == NULL) {
204                 DEBUG(DEBUG_ERR, (__location__ " Memory allocation failure\n"));
205                 return;
206         }
207
208         srvid_dispatch(ctdb->srv, c->srvid, CTDB_SRVID_ALL, data);
209 }
210
211 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
212
213 /*
214   this is called in the client, when data comes in from the daemon
215  */
216 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
217 {
218         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
219         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
220         TALLOC_CTX *tmp_ctx;
221
222         /* place the packet as a child of a tmp_ctx. We then use
223            talloc_free() below to free it. If any of the calls want
224            to keep it, then they will steal it somewhere else, and the
225            talloc_free() will be a no-op */
226         tmp_ctx = talloc_new(ctdb);
227         talloc_steal(tmp_ctx, hdr);
228
229         if (cnt == 0) {
230                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
231                 exit(1);
232         }
233
234         if (cnt < sizeof(*hdr)) {
235                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
236                 goto done;
237         }
238         if (cnt != hdr->length) {
239                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
240                                (unsigned)hdr->length, (unsigned)cnt);
241                 goto done;
242         }
243
244         if (hdr->ctdb_magic != CTDB_MAGIC) {
245                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
246                 goto done;
247         }
248
249         if (hdr->ctdb_version != CTDB_PROTOCOL) {
250                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
251                 goto done;
252         }
253
254         switch (hdr->operation) {
255         case CTDB_REPLY_CALL:
256                 ctdb_client_reply_call(ctdb, hdr);
257                 break;
258
259         case CTDB_REQ_MESSAGE:
260                 ctdb_request_message(ctdb, hdr);
261                 break;
262
263         case CTDB_REPLY_CONTROL:
264                 ctdb_client_reply_control(ctdb, hdr);
265                 break;
266
267         default:
268                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
269         }
270
271 done:
272         talloc_free(tmp_ctx);
273 }
274
275 /*
276   connect to a unix domain socket
277 */
278 int ctdb_socket_connect(struct ctdb_context *ctdb)
279 {
280         struct sockaddr_un addr;
281
282         memset(&addr, 0, sizeof(addr));
283         addr.sun_family = AF_UNIX;
284         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
285
286         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
287         if (ctdb->daemon.sd == -1) {
288                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
289                 return -1;
290         }
291
292         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
293                 close(ctdb->daemon.sd);
294                 ctdb->daemon.sd = -1;
295                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
296                 return -1;
297         }
298
299         set_nonblocking(ctdb->daemon.sd);
300         set_close_on_exec(ctdb->daemon.sd);
301         
302         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
303                                               CTDB_DS_ALIGNMENT, 
304                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
305         return 0;
306 }
307
308
309 struct ctdb_record_handle {
310         struct ctdb_db_context *ctdb_db;
311         TDB_DATA key;
312         TDB_DATA *data;
313         struct ctdb_ltdb_header header;
314 };
315
316
317 /*
318   make a recv call to the local ctdb daemon - called from client context
319
320   This is called when the program wants to wait for a ctdb_call to complete and get the 
321   results. This call will block unless the call has already completed.
322 */
323 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
324 {
325         if (state == NULL) {
326                 return -1;
327         }
328
329         while (state->state < CTDB_CALL_DONE) {
330                 tevent_loop_once(state->ctdb_db->ctdb->ev);
331         }
332         if (state->state != CTDB_CALL_DONE) {
333                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
334                 talloc_free(state);
335                 return -1;
336         }
337
338         if (state->call->reply_data.dsize) {
339                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
340                                                       state->call->reply_data.dptr,
341                                                       state->call->reply_data.dsize);
342                 call->reply_data.dsize = state->call->reply_data.dsize;
343         } else {
344                 call->reply_data.dptr = NULL;
345                 call->reply_data.dsize = 0;
346         }
347         call->status = state->call->status;
348         talloc_free(state);
349
350         return call->status;
351 }
352
353
354
355
356 /*
357   destroy a ctdb_call in client
358 */
359 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
360 {
361         reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
362         return 0;
363 }
364
365 /*
366   construct an event driven local ctdb_call
367
368   this is used so that locally processed ctdb_call requests are processed
369   in an event driven manner
370 */
371 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
372                                                                   struct ctdb_call *call,
373                                                                   struct ctdb_ltdb_header *header,
374                                                                   TDB_DATA *data)
375 {
376         struct ctdb_client_call_state *state;
377         struct ctdb_context *ctdb = ctdb_db->ctdb;
378         int ret;
379
380         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
381         CTDB_NO_MEMORY_NULL(ctdb, state);
382         state->call = talloc_zero(state, struct ctdb_call);
383         CTDB_NO_MEMORY_NULL(ctdb, state->call);
384
385         talloc_steal(state, data->dptr);
386
387         state->state   = CTDB_CALL_DONE;
388         *(state->call) = *call;
389         state->ctdb_db = ctdb_db;
390
391         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
392         if (ret != 0) {
393                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
394         }
395
396         return state;
397 }
398
399 /*
400   make a ctdb call to the local daemon - async send. Called from client context.
401
402   This constructs a ctdb_call request and queues it for processing. 
403   This call never blocks.
404 */
405 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
406                                               struct ctdb_call *call)
407 {
408         struct ctdb_client_call_state *state;
409         struct ctdb_context *ctdb = ctdb_db->ctdb;
410         struct ctdb_ltdb_header header;
411         TDB_DATA data;
412         int ret;
413         size_t len;
414         struct ctdb_req_call_old *c;
415
416         /* if the domain socket is not yet open, open it */
417         if (ctdb->daemon.sd==-1) {
418                 ctdb_socket_connect(ctdb);
419         }
420
421         ret = ctdb_ltdb_lock(ctdb_db, call->key);
422         if (ret != 0) {
423                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
424                 return NULL;
425         }
426
427         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
428
429         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
430                 ret = -1;
431         }
432
433         if (ret == 0 && header.dmaster == ctdb->pnn) {
434                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
435                 talloc_free(data.dptr);
436                 ctdb_ltdb_unlock(ctdb_db, call->key);
437                 return state;
438         }
439
440         ctdb_ltdb_unlock(ctdb_db, call->key);
441         talloc_free(data.dptr);
442
443         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
444         if (state == NULL) {
445                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
446                 return NULL;
447         }
448         state->call = talloc_zero(state, struct ctdb_call);
449         if (state->call == NULL) {
450                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
451                 return NULL;
452         }
453
454         len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize + call->call_data.dsize;
455         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call_old);
456         if (c == NULL) {
457                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
458                 return NULL;
459         }
460
461         state->reqid     = reqid_new(ctdb->idr, state);
462         state->ctdb_db = ctdb_db;
463         talloc_set_destructor(state, ctdb_client_call_destructor);
464
465         c->hdr.reqid     = state->reqid;
466         c->flags         = call->flags;
467         c->db_id         = ctdb_db->db_id;
468         c->callid        = call->call_id;
469         c->hopcount      = 0;
470         c->keylen        = call->key.dsize;
471         c->calldatalen   = call->call_data.dsize;
472         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
473         memcpy(&c->data[call->key.dsize], 
474                call->call_data.dptr, call->call_data.dsize);
475         *(state->call)              = *call;
476         state->call->call_data.dptr = &c->data[call->key.dsize];
477         state->call->key.dptr       = &c->data[0];
478
479         state->state  = CTDB_CALL_WAIT;
480
481
482         ctdb_client_queue_pkt(ctdb, &c->hdr);
483
484         return state;
485 }
486
487
488 /*
489   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
490 */
491 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
492 {
493         struct ctdb_client_call_state *state;
494
495         state = ctdb_call_send(ctdb_db, call);
496         return ctdb_call_recv(state, call);
497 }
498
499
500 /*
501   tell the daemon what messaging srvid we will use, and register the message
502   handler function in the client
503 */
504 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
505                                     srvid_handler_fn handler,
506                                     void *private_data)
507 {
508         int res;
509         int32_t status;
510
511         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid,
512                            CTDB_CONTROL_REGISTER_SRVID, 0,
513                            tdb_null, NULL, NULL, &status, NULL, NULL);
514         if (res != 0 || status != 0) {
515                 DEBUG(DEBUG_ERR,
516                       ("Failed to register srvid %llu\n",
517                        (unsigned long long)srvid));
518                 return -1;
519         }
520
521         /* also need to register the handler with our own ctdb structure */
522         return srvid_register(ctdb->srv, ctdb, srvid, handler, private_data);
523 }
524
525 /*
526   tell the daemon we no longer want a srvid
527 */
528 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb,
529                                        uint64_t srvid, void *private_data)
530 {
531         int res;
532         int32_t status;
533
534         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid,
535                            CTDB_CONTROL_DEREGISTER_SRVID, 0,
536                            tdb_null, NULL, NULL, &status, NULL, NULL);
537         if (res != 0 || status != 0) {
538                 DEBUG(DEBUG_ERR,
539                       ("Failed to deregister srvid %llu\n",
540                        (unsigned long long)srvid));
541                 return -1;
542         }
543
544         /* also need to register the handler with our own ctdb structure */
545         srvid_deregister(ctdb->srv, srvid, private_data);
546         return 0;
547 }
548
549 /*
550  * check server ids
551  */
552 int ctdb_client_check_message_handlers(struct ctdb_context *ctdb, uint64_t *ids, uint32_t num,
553                                        uint8_t *result)
554 {
555         TDB_DATA indata, outdata;
556         int res;
557         int32_t status;
558         int i;
559
560         indata.dptr = (uint8_t *)ids;
561         indata.dsize = num * sizeof(*ids);
562
563         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CHECK_SRVIDS, 0,
564                            indata, ctdb, &outdata, &status, NULL, NULL);
565         if (res != 0 || status != 0) {
566                 DEBUG(DEBUG_ERR, (__location__ " failed to check srvids\n"));
567                 return -1;
568         }
569
570         if (outdata.dsize != num*sizeof(uint8_t)) {
571                 DEBUG(DEBUG_ERR, (__location__ " expected %lu bytes, received %zi bytes\n",
572                                   (long unsigned int)num*sizeof(uint8_t),
573                                   outdata.dsize));
574                 talloc_free(outdata.dptr);
575                 return -1;
576         }
577
578         for (i=0; i<num; i++) {
579                 result[i] = outdata.dptr[i];
580         }
581
582         talloc_free(outdata.dptr);
583         return 0;
584 }
585
586 /*
587   send a message - from client context
588  */
589 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
590                       uint64_t srvid, TDB_DATA data)
591 {
592         struct ctdb_req_message_old *r;
593         int len, res;
594
595         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
596         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
597                                len, struct ctdb_req_message_old);
598         CTDB_NO_MEMORY(ctdb, r);
599
600         r->hdr.destnode  = pnn;
601         r->srvid         = srvid;
602         r->datalen       = data.dsize;
603         memcpy(&r->data[0], data.dptr, data.dsize);
604         
605         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
606         talloc_free(r);
607         return res;
608 }
609
610
611 /*
612   cancel a ctdb_fetch_lock operation, releasing the lock
613  */
614 static int fetch_lock_destructor(struct ctdb_record_handle *h)
615 {
616         ctdb_ltdb_unlock(h->ctdb_db, h->key);
617         return 0;
618 }
619
620 /*
621   force the migration of a record to this node
622  */
623 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
624 {
625         struct ctdb_call call;
626         ZERO_STRUCT(call);
627         call.call_id = CTDB_NULL_FUNC;
628         call.key = key;
629         call.flags = CTDB_IMMEDIATE_MIGRATION;
630         return ctdb_call(ctdb_db, &call);
631 }
632
633 /*
634   try to fetch a readonly copy of a record
635  */
636 static int
637 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
638 {
639         int ret;
640
641         struct ctdb_call call;
642         ZERO_STRUCT(call);
643
644         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
645         call.call_data.dptr = NULL;
646         call.call_data.dsize = 0;
647         call.key = key;
648         call.flags = CTDB_WANT_READONLY;
649         ret = ctdb_call(ctdb_db, &call);
650
651         if (ret != 0) {
652                 return -1;
653         }
654         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
655                 return -1;
656         }
657
658         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
659         if (*hdr == NULL) {
660                 talloc_free(call.reply_data.dptr);
661                 return -1;
662         }
663
664         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
665         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
666         if (data->dptr == NULL) {
667                 talloc_free(call.reply_data.dptr);
668                 talloc_free(hdr);
669                 return -1;
670         }
671
672         return 0;
673 }
674
675 /*
676   get a lock on a record, and return the records data. Blocks until it gets the lock
677  */
678 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
679                                            TDB_DATA key, TDB_DATA *data)
680 {
681         int ret;
682         struct ctdb_record_handle *h;
683
684         /*
685           procedure is as follows:
686
687           1) get the chain lock. 
688           2) check if we are dmaster
689           3) if we are the dmaster then return handle 
690           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
691              reply from ctdbd
692           5) when we get the reply, goto (1)
693          */
694
695         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
696         if (h == NULL) {
697                 return NULL;
698         }
699
700         h->ctdb_db = ctdb_db;
701         h->key     = key;
702         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
703         if (h->key.dptr == NULL) {
704                 talloc_free(h);
705                 return NULL;
706         }
707         h->data    = data;
708
709         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
710                  (const char *)key.dptr));
711
712 again:
713         /* step 1 - get the chain lock */
714         ret = ctdb_ltdb_lock(ctdb_db, key);
715         if (ret != 0) {
716                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
717                 talloc_free(h);
718                 return NULL;
719         }
720
721         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
722
723         talloc_set_destructor(h, fetch_lock_destructor);
724
725         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
726
727         /* when torturing, ensure we test the remote path */
728         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
729             random() % 5 == 0) {
730                 h->header.dmaster = (uint32_t)-1;
731         }
732
733
734         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
735
736         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
737                 ctdb_ltdb_unlock(ctdb_db, key);
738                 ret = ctdb_client_force_migration(ctdb_db, key);
739                 if (ret != 0) {
740                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
741                         talloc_free(h);
742                         return NULL;
743                 }
744                 goto again;
745         }
746
747         /* if this is a request for read/write and we have delegations
748            we have to revoke all delegations first
749         */
750         if ((h->header.dmaster == ctdb_db->ctdb->pnn) &&
751             (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
752                 ctdb_ltdb_unlock(ctdb_db, key);
753                 ret = ctdb_client_force_migration(ctdb_db, key);
754                 if (ret != 0) {
755                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
756                         talloc_free(h);
757                         return NULL;
758                 }
759                 goto again;
760         }
761
762         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
763         return h;
764 }
765
766 /*
767   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
768  */
769 struct ctdb_record_handle *
770 ctdb_fetch_readonly_lock(
771         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
772         TDB_DATA key, TDB_DATA *data,
773         int read_only)
774 {
775         int ret;
776         struct ctdb_record_handle *h;
777         struct ctdb_ltdb_header *roheader = NULL;
778
779         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
780         if (h == NULL) {
781                 return NULL;
782         }
783
784         h->ctdb_db = ctdb_db;
785         h->key     = key;
786         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
787         if (h->key.dptr == NULL) {
788                 talloc_free(h);
789                 return NULL;
790         }
791         h->data    = data;
792
793         data->dptr = NULL;
794         data->dsize = 0;
795
796
797 again:
798         talloc_free(roheader);
799         roheader = NULL;
800
801         talloc_free(data->dptr);
802         data->dptr = NULL;
803         data->dsize = 0;
804
805         /* Lock the record/chain */
806         ret = ctdb_ltdb_lock(ctdb_db, key);
807         if (ret != 0) {
808                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
809                 talloc_free(h);
810                 return NULL;
811         }
812
813         talloc_set_destructor(h, fetch_lock_destructor);
814
815         /* Check if record exists yet in the TDB */
816         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
817         if (ret != 0) {
818                 ctdb_ltdb_unlock(ctdb_db, key);
819                 ret = ctdb_client_force_migration(ctdb_db, key);
820                 if (ret != 0) {
821                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
822                         talloc_free(h);
823                         return NULL;
824                 }
825                 goto again;
826         }
827
828         /* if this is a request for read/write and we have delegations
829            we have to revoke all delegations first
830         */
831         if ((read_only == 0) 
832         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
833         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
834                 ctdb_ltdb_unlock(ctdb_db, key);
835                 ret = ctdb_client_force_migration(ctdb_db, key);
836                 if (ret != 0) {
837                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
838                         talloc_free(h);
839                         return NULL;
840                 }
841                 goto again;
842         }
843
844         /* if we are dmaster, just return the handle */
845         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
846                 return h;
847         }
848
849         if (read_only != 0) {
850                 TDB_DATA rodata = {NULL, 0};
851
852                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
853                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
854                         return h;
855                 }
856
857                 ctdb_ltdb_unlock(ctdb_db, key);
858                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
859                 if (ret != 0) {
860                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
861                         ret = ctdb_client_force_migration(ctdb_db, key);
862                         if (ret != 0) {
863                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
864                                 talloc_free(h);
865                                 return NULL;
866                         }
867
868                         goto again;
869                 }
870
871                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
872                         ret = ctdb_client_force_migration(ctdb_db, key);
873                         if (ret != 0) {
874                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
875                                 talloc_free(h);
876                                 return NULL;
877                         }
878
879                         goto again;
880                 }
881
882                 ret = ctdb_ltdb_lock(ctdb_db, key);
883                 if (ret != 0) {
884                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
885                         talloc_free(h);
886                         return NULL;
887                 }
888
889                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
890                 if (ret != 0) {
891                         ctdb_ltdb_unlock(ctdb_db, key);
892
893                         ret = ctdb_client_force_migration(ctdb_db, key);
894                         if (ret != 0) {
895                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
896                                 talloc_free(h);
897                                 return NULL;
898                         }
899
900                         goto again;
901                 }
902
903                 return h;
904         }
905
906         /* we are not dmaster and this was not a request for a readonly lock
907          * so unlock the record, migrate it and try again
908          */
909         ctdb_ltdb_unlock(ctdb_db, key);
910         ret = ctdb_client_force_migration(ctdb_db, key);
911         if (ret != 0) {
912                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
913                 talloc_free(h);
914                 return NULL;
915         }
916         goto again;
917 }
918
919 /*
920   store some data to the record that was locked with ctdb_fetch_lock()
921 */
922 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
923 {
924         if (h->ctdb_db->persistent) {
925                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
926                 return -1;
927         }
928
929         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
930 }
931
932 /*
933   non-locking fetch of a record
934  */
935 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
936                TDB_DATA key, TDB_DATA *data)
937 {
938         struct ctdb_call call;
939         int ret;
940
941         call.call_id = CTDB_FETCH_FUNC;
942         call.call_data.dptr = NULL;
943         call.call_data.dsize = 0;
944         call.key = key;
945
946         ret = ctdb_call(ctdb_db, &call);
947
948         if (ret == 0) {
949                 *data = call.reply_data;
950                 talloc_steal(mem_ctx, data->dptr);
951         }
952
953         return ret;
954 }
955
956
957
958 /*
959    called when a control completes or timesout to invoke the callback
960    function the user provided
961 */
962 static void invoke_control_callback(struct tevent_context *ev,
963                                     struct tevent_timer *te,
964                                     struct timeval t, void *private_data)
965 {
966         struct ctdb_client_control_state *state;
967         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
968         int ret;
969
970         state = talloc_get_type(private_data, struct ctdb_client_control_state);
971         talloc_steal(tmp_ctx, state);
972
973         ret = ctdb_control_recv(state->ctdb, state, state,
974                         NULL, 
975                         NULL, 
976                         NULL);
977         if (ret != 0) {
978                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
979         }
980
981         talloc_free(tmp_ctx);
982 }
983
984 /*
985   called when a CTDB_REPLY_CONTROL packet comes in in the client
986
987   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
988   contains any reply data from the control
989 */
990 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
991                                       struct ctdb_req_header *hdr)
992 {
993         struct ctdb_reply_control_old *c = (struct ctdb_reply_control_old *)hdr;
994         struct ctdb_client_control_state *state;
995
996         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_client_control_state);
997         if (state == NULL) {
998                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
999                 return;
1000         }
1001
1002         if (hdr->reqid != state->reqid) {
1003                 /* we found a record  but it was the wrong one */
1004                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
1005                 return;
1006         }
1007
1008         state->outdata.dptr = c->data;
1009         state->outdata.dsize = c->datalen;
1010         state->status = c->status;
1011         if (c->errorlen) {
1012                 state->errormsg = talloc_strndup(state, 
1013                                                  (char *)&c->data[c->datalen], 
1014                                                  c->errorlen);
1015         }
1016
1017         /* state->outdata now uses resources from c so we dont want c
1018            to just dissappear from under us while state is still alive
1019         */
1020         talloc_steal(state, c);
1021
1022         state->state = CTDB_CONTROL_DONE;
1023
1024         /* if we had a callback registered for this control, pull the response
1025            and call the callback.
1026         */
1027         if (state->async.fn) {
1028                 tevent_add_timer(ctdb->ev, state, timeval_zero(),
1029                                  invoke_control_callback, state);
1030         }
1031 }
1032
1033
1034 /*
1035   destroy a ctdb_control in client
1036 */
1037 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
1038 {
1039         reqid_remove(state->ctdb->idr, state->reqid);
1040         return 0;
1041 }
1042
1043
1044 /* time out handler for ctdb_control */
1045 static void control_timeout_func(struct tevent_context *ev,
1046                                  struct tevent_timer *te,
1047                                  struct timeval t, void *private_data)
1048 {
1049         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
1050
1051         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
1052                          "dstnode:%u\n", state->reqid, state->c->opcode,
1053                          state->c->hdr.destnode));
1054
1055         state->state = CTDB_CONTROL_TIMEOUT;
1056
1057         /* if we had a callback registered for this control, pull the response
1058            and call the callback.
1059         */
1060         if (state->async.fn) {
1061                 tevent_add_timer(state->ctdb->ev, state, timeval_zero(),
1062                                  invoke_control_callback, state);
1063         }
1064 }
1065
1066 /* async version of send control request */
1067 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1068                 uint32_t destnode, uint64_t srvid, 
1069                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1070                 TALLOC_CTX *mem_ctx,
1071                 struct timeval *timeout,
1072                 char **errormsg)
1073 {
1074         struct ctdb_client_control_state *state;
1075         size_t len;
1076         struct ctdb_req_control_old *c;
1077         int ret;
1078
1079         if (errormsg) {
1080                 *errormsg = NULL;
1081         }
1082
1083         /* if the domain socket is not yet open, open it */
1084         if (ctdb->daemon.sd==-1) {
1085                 ctdb_socket_connect(ctdb);
1086         }
1087
1088         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1089         CTDB_NO_MEMORY_NULL(ctdb, state);
1090
1091         state->ctdb       = ctdb;
1092         state->reqid      = reqid_new(ctdb->idr, state);
1093         state->state      = CTDB_CONTROL_WAIT;
1094         state->errormsg   = NULL;
1095
1096         talloc_set_destructor(state, ctdb_client_control_destructor);
1097
1098         len = offsetof(struct ctdb_req_control_old, data) + data.dsize;
1099         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1100                                len, struct ctdb_req_control_old);
1101         state->c            = c;        
1102         CTDB_NO_MEMORY_NULL(ctdb, c);
1103         c->hdr.reqid        = state->reqid;
1104         c->hdr.destnode     = destnode;
1105         c->opcode           = opcode;
1106         c->client_id        = 0;
1107         c->flags            = flags;
1108         c->srvid            = srvid;
1109         c->datalen          = data.dsize;
1110         if (data.dsize) {
1111                 memcpy(&c->data[0], data.dptr, data.dsize);
1112         }
1113
1114         /* timeout */
1115         if (timeout && !timeval_is_zero(timeout)) {
1116                 tevent_add_timer(ctdb->ev, state, *timeout,
1117                                  control_timeout_func, state);
1118         }
1119
1120         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1121         if (ret != 0) {
1122                 talloc_free(state);
1123                 return NULL;
1124         }
1125
1126         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1127                 talloc_free(state);
1128                 return NULL;
1129         }
1130
1131         return state;
1132 }
1133
1134
1135 /* async version of receive control reply */
1136 int ctdb_control_recv(struct ctdb_context *ctdb, 
1137                 struct ctdb_client_control_state *state, 
1138                 TALLOC_CTX *mem_ctx,
1139                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1140 {
1141         TALLOC_CTX *tmp_ctx;
1142
1143         if (status != NULL) {
1144                 *status = -1;
1145         }
1146         if (errormsg != NULL) {
1147                 *errormsg = NULL;
1148         }
1149
1150         if (state == NULL) {
1151                 return -1;
1152         }
1153
1154         /* prevent double free of state */
1155         tmp_ctx = talloc_new(ctdb);
1156         talloc_steal(tmp_ctx, state);
1157
1158         /* loop one event at a time until we either timeout or the control
1159            completes.
1160         */
1161         while (state->state == CTDB_CONTROL_WAIT) {
1162                 tevent_loop_once(ctdb->ev);
1163         }
1164
1165         if (state->state != CTDB_CONTROL_DONE) {
1166                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1167                 if (state->async.fn) {
1168                         state->async.fn(state);
1169                 }
1170                 talloc_free(tmp_ctx);
1171                 return -1;
1172         }
1173
1174         if (state->errormsg) {
1175                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1176                 if (errormsg) {
1177                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1178                 }
1179                 if (state->async.fn) {
1180                         state->async.fn(state);
1181                 }
1182                 talloc_free(tmp_ctx);
1183                 return (status == 0 ? -1 : state->status);
1184         }
1185
1186         if (outdata) {
1187                 *outdata = state->outdata;
1188                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1189         }
1190
1191         if (status) {
1192                 *status = state->status;
1193         }
1194
1195         if (state->async.fn) {
1196                 state->async.fn(state);
1197         }
1198
1199         talloc_free(tmp_ctx);
1200         return 0;
1201 }
1202
1203
1204
1205 /*
1206   send a ctdb control message
1207   timeout specifies how long we should wait for a reply.
1208   if timeout is NULL we wait indefinitely
1209  */
1210 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1211                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1212                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1213                  struct timeval *timeout,
1214                  char **errormsg)
1215 {
1216         struct ctdb_client_control_state *state;
1217
1218         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1219                         flags, data, mem_ctx,
1220                         timeout, errormsg);
1221
1222         /* FIXME: Error conditions in ctdb_control_send return NULL without
1223          * setting errormsg.  So, there is no way to distinguish between sucess
1224          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1225         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1226                 if (status != NULL) {
1227                         *status = 0;
1228                 }
1229                 return 0;
1230         }
1231
1232         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1233                         errormsg);
1234 }
1235
1236
1237
1238
1239 /*
1240   a process exists call. Returns 0 if process exists, -1 otherwise
1241  */
1242 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1243 {
1244         int ret;
1245         TDB_DATA data;
1246         int32_t status;
1247
1248         data.dptr = (uint8_t*)&pid;
1249         data.dsize = sizeof(pid);
1250
1251         ret = ctdb_control(ctdb, destnode, 0, 
1252                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1253                            NULL, NULL, &status, NULL, NULL);
1254         if (ret != 0) {
1255                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1256                 return -1;
1257         }
1258
1259         return status;
1260 }
1261
1262 /*
1263   get remote statistics
1264  */
1265 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1266 {
1267         int ret;
1268         TDB_DATA data;
1269         int32_t res;
1270
1271         ret = ctdb_control(ctdb, destnode, 0, 
1272                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1273                            ctdb, &data, &res, NULL, NULL);
1274         if (ret != 0 || res != 0) {
1275                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1276                 return -1;
1277         }
1278
1279         if (data.dsize != sizeof(struct ctdb_statistics)) {
1280                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1281                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1282                       return -1;
1283         }
1284
1285         *status = *(struct ctdb_statistics *)data.dptr;
1286         talloc_free(data.dptr);
1287                         
1288         return 0;
1289 }
1290
1291 /*
1292  * get db statistics
1293  */
1294 int ctdb_ctrl_dbstatistics(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1295                            TALLOC_CTX *mem_ctx, struct ctdb_db_statistics_old **dbstat)
1296 {
1297         int ret;
1298         TDB_DATA indata, outdata;
1299         int32_t res;
1300         struct ctdb_db_statistics_old *wire, *s;
1301         char *ptr;
1302         int i;
1303
1304         indata.dptr = (uint8_t *)&dbid;
1305         indata.dsize = sizeof(dbid);
1306
1307         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_STATISTICS,
1308                            0, indata, ctdb, &outdata, &res, NULL, NULL);
1309         if (ret != 0 || res != 0) {
1310                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for dbstatistics failed\n"));
1311                 return -1;
1312         }
1313
1314         if (outdata.dsize < offsetof(struct ctdb_db_statistics_old, hot_keys_wire)) {
1315                 DEBUG(DEBUG_ERR,(__location__ " Wrong dbstatistics size %zi - expected >= %lu\n",
1316                                  outdata.dsize,
1317                                  (long unsigned int)sizeof(struct ctdb_statistics)));
1318                 return -1;
1319         }
1320
1321         s = talloc_zero(mem_ctx, struct ctdb_db_statistics_old);
1322         if (s == NULL) {
1323                 talloc_free(outdata.dptr);
1324                 CTDB_NO_MEMORY(ctdb, s);
1325         }
1326
1327         wire = (struct ctdb_db_statistics_old *)outdata.dptr;
1328         memcpy(s, wire, offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1329         ptr = &wire->hot_keys_wire[0];
1330         for (i=0; i<wire->num_hot_keys; i++) {
1331                 s->hot_keys[i].key.dptr = talloc_size(mem_ctx, s->hot_keys[i].key.dsize);
1332                 if (s->hot_keys[i].key.dptr == NULL) {
1333                         talloc_free(outdata.dptr);
1334                         CTDB_NO_MEMORY(ctdb, s->hot_keys[i].key.dptr);
1335                 }
1336
1337                 memcpy(s->hot_keys[i].key.dptr, ptr, s->hot_keys[i].key.dsize);
1338                 ptr += wire->hot_keys[i].key.dsize;
1339         }
1340
1341         talloc_free(outdata.dptr);
1342         *dbstat = s;
1343         return 0;
1344 }
1345
1346 /*
1347   shutdown a remote ctdb node
1348  */
1349 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1350 {
1351         struct ctdb_client_control_state *state;
1352
1353         state = ctdb_control_send(ctdb, destnode, 0, 
1354                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1355                            NULL, &timeout, NULL);
1356         if (state == NULL) {
1357                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1358                 return -1;
1359         }
1360
1361         return 0;
1362 }
1363
1364 /*
1365   get vnn map from a remote node
1366  */
1367 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1368 {
1369         int ret;
1370         TDB_DATA outdata;
1371         int32_t res;
1372         struct ctdb_vnn_map_wire *map;
1373
1374         ret = ctdb_control(ctdb, destnode, 0, 
1375                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1376                            mem_ctx, &outdata, &res, &timeout, NULL);
1377         if (ret != 0 || res != 0) {
1378                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1379                 return -1;
1380         }
1381         
1382         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1383         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1384             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1385                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1386                 return -1;
1387         }
1388
1389         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1390         CTDB_NO_MEMORY(ctdb, *vnnmap);
1391         (*vnnmap)->generation = map->generation;
1392         (*vnnmap)->size       = map->size;
1393         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1394
1395         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1396         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1397         talloc_free(outdata.dptr);
1398                     
1399         return 0;
1400 }
1401
1402
1403 /*
1404   get the recovery mode of a remote node
1405  */
1406 struct ctdb_client_control_state *
1407 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1408 {
1409         return ctdb_control_send(ctdb, destnode, 0, 
1410                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1411                            mem_ctx, &timeout, NULL);
1412 }
1413
1414 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1415 {
1416         int ret;
1417         int32_t res;
1418
1419         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1420         if (ret != 0) {
1421                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1422                 return -1;
1423         }
1424
1425         if (recmode) {
1426                 *recmode = (uint32_t)res;
1427         }
1428
1429         return 0;
1430 }
1431
1432 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1433 {
1434         struct ctdb_client_control_state *state;
1435
1436         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1437         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1438 }
1439
1440
1441
1442
1443 /*
1444   set the recovery mode of a remote node
1445  */
1446 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1447 {
1448         int ret;
1449         TDB_DATA data;
1450         int32_t res;
1451
1452         data.dsize = sizeof(uint32_t);
1453         data.dptr = (unsigned char *)&recmode;
1454
1455         ret = ctdb_control(ctdb, destnode, 0, 
1456                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1457                            NULL, NULL, &res, &timeout, NULL);
1458         if (ret != 0 || res != 0) {
1459                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1460                 return -1;
1461         }
1462
1463         return 0;
1464 }
1465
1466
1467
1468 /*
1469   get the recovery master of a remote node
1470  */
1471 struct ctdb_client_control_state *
1472 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1473                         struct timeval timeout, uint32_t destnode)
1474 {
1475         return ctdb_control_send(ctdb, destnode, 0, 
1476                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1477                            mem_ctx, &timeout, NULL);
1478 }
1479
1480 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1481 {
1482         int ret;
1483         int32_t res;
1484
1485         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1486         if (ret != 0) {
1487                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1488                 return -1;
1489         }
1490
1491         if (recmaster) {
1492                 *recmaster = (uint32_t)res;
1493         }
1494
1495         return 0;
1496 }
1497
1498 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1499 {
1500         struct ctdb_client_control_state *state;
1501
1502         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1503         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1504 }
1505
1506
1507 /*
1508   set the recovery master of a remote node
1509  */
1510 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1511 {
1512         int ret;
1513         TDB_DATA data;
1514         int32_t res;
1515
1516         ZERO_STRUCT(data);
1517         data.dsize = sizeof(uint32_t);
1518         data.dptr = (unsigned char *)&recmaster;
1519
1520         ret = ctdb_control(ctdb, destnode, 0, 
1521                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1522                            NULL, NULL, &res, &timeout, NULL);
1523         if (ret != 0 || res != 0) {
1524                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1525                 return -1;
1526         }
1527
1528         return 0;
1529 }
1530
1531
1532 /*
1533   get a list of databases off a remote node
1534  */
1535 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1536                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map_old **dbmap)
1537 {
1538         int ret;
1539         TDB_DATA outdata;
1540         int32_t res;
1541
1542         ret = ctdb_control(ctdb, destnode, 0, 
1543                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1544                            mem_ctx, &outdata, &res, &timeout, NULL);
1545         if (ret != 0 || res != 0) {
1546                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1547                 return -1;
1548         }
1549
1550         *dbmap = (struct ctdb_dbid_map_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1551         talloc_free(outdata.dptr);
1552                     
1553         return 0;
1554 }
1555
1556 /*
1557   get a list of nodes (vnn and flags ) from a remote node
1558  */
1559 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1560                 struct timeval timeout, uint32_t destnode, 
1561                 TALLOC_CTX *mem_ctx, struct ctdb_node_map_old **nodemap)
1562 {
1563         int ret;
1564         TDB_DATA outdata;
1565         int32_t res;
1566
1567         ret = ctdb_control(ctdb, destnode, 0,
1568                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null,
1569                            mem_ctx, &outdata, &res, &timeout, NULL);
1570         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1571                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1572                 return -1;
1573         }
1574
1575         *nodemap = (struct ctdb_node_map_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1576         talloc_free(outdata.dptr);
1577         return 0;
1578 }
1579
1580 /*
1581   load nodes file on a remote node and return as a node map
1582  */
1583 int ctdb_ctrl_getnodesfile(struct ctdb_context *ctdb,
1584                            struct timeval timeout, uint32_t destnode,
1585                            TALLOC_CTX *mem_ctx, struct ctdb_node_map_old **nodemap)
1586 {
1587         int ret;
1588         TDB_DATA outdata;
1589         int32_t res;
1590
1591         ret = ctdb_control(ctdb, destnode, 0,
1592                            CTDB_CONTROL_GET_NODES_FILE, 0, tdb_null,
1593                            mem_ctx, &outdata, &res, &timeout, NULL);
1594         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1595                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1596                 return -1;
1597         }
1598
1599         *nodemap = (struct ctdb_node_map_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1600         talloc_free(outdata.dptr);
1601
1602         return 0;
1603 }
1604
1605 /*
1606   drop the transport, reload the nodes file and restart the transport
1607  */
1608 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1609                     struct timeval timeout, uint32_t destnode)
1610 {
1611         int ret;
1612         int32_t res;
1613
1614         ret = ctdb_control(ctdb, destnode, 0, 
1615                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1616                            NULL, NULL, &res, &timeout, NULL);
1617         if (ret != 0 || res != 0) {
1618                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1619                 return -1;
1620         }
1621
1622         return 0;
1623 }
1624
1625
1626 /*
1627   set vnn map on a node
1628  */
1629 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1630                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1631 {
1632         int ret;
1633         TDB_DATA data;
1634         int32_t res;
1635         struct ctdb_vnn_map_wire *map;
1636         size_t len;
1637
1638         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1639         map = talloc_size(mem_ctx, len);
1640         CTDB_NO_MEMORY(ctdb, map);
1641
1642         map->generation = vnnmap->generation;
1643         map->size = vnnmap->size;
1644         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1645         
1646         data.dsize = len;
1647         data.dptr  = (uint8_t *)map;
1648
1649         ret = ctdb_control(ctdb, destnode, 0, 
1650                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1651                            NULL, NULL, &res, &timeout, NULL);
1652         if (ret != 0 || res != 0) {
1653                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1654                 return -1;
1655         }
1656
1657         talloc_free(map);
1658
1659         return 0;
1660 }
1661
1662
1663 /*
1664   async send for pull database
1665  */
1666 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1667         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1668         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1669 {
1670         TDB_DATA indata;
1671         struct ctdb_pulldb *pull;
1672         struct ctdb_client_control_state *state;
1673
1674         pull = talloc(mem_ctx, struct ctdb_pulldb);
1675         CTDB_NO_MEMORY_NULL(ctdb, pull);
1676
1677         pull->db_id   = dbid;
1678         pull->lmaster = lmaster;
1679
1680         indata.dsize = sizeof(struct ctdb_pulldb);
1681         indata.dptr  = (unsigned char *)pull;
1682
1683         state = ctdb_control_send(ctdb, destnode, 0, 
1684                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1685                                   mem_ctx, &timeout, NULL);
1686         talloc_free(pull);
1687
1688         return state;
1689 }
1690
1691 /*
1692   async recv for pull database
1693  */
1694 int ctdb_ctrl_pulldb_recv(
1695         struct ctdb_context *ctdb, 
1696         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1697         TDB_DATA *outdata)
1698 {
1699         int ret;
1700         int32_t res;
1701
1702         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1703         if ( (ret != 0) || (res != 0) ){
1704                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1705                 return -1;
1706         }
1707
1708         return 0;
1709 }
1710
1711 /*
1712   pull all keys and records for a specific database on a node
1713  */
1714 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1715                 uint32_t dbid, uint32_t lmaster, 
1716                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1717                 TDB_DATA *outdata)
1718 {
1719         struct ctdb_client_control_state *state;
1720
1721         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1722                                       timeout);
1723         
1724         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1725 }
1726
1727
1728 /*
1729   change dmaster for all keys in the database to the new value
1730  */
1731 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1732                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1733 {
1734         int ret;
1735         TDB_DATA indata;
1736         int32_t res;
1737
1738         indata.dsize = 2*sizeof(uint32_t);
1739         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1740
1741         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1742         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1743
1744         ret = ctdb_control(ctdb, destnode, 0, 
1745                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1746                            NULL, NULL, &res, &timeout, NULL);
1747         if (ret != 0 || res != 0) {
1748                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1749                 return -1;
1750         }
1751
1752         return 0;
1753 }
1754
1755 /*
1756   ping a node, return number of clients connected
1757  */
1758 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1759 {
1760         int ret;
1761         int32_t res;
1762
1763         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1764                            tdb_null, NULL, NULL, &res, NULL, NULL);
1765         if (ret != 0) {
1766                 return -1;
1767         }
1768         return res;
1769 }
1770
1771 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1772                            struct timeval timeout, 
1773                            uint32_t destnode,
1774                            uint32_t *runstate)
1775 {
1776         TDB_DATA outdata;
1777         int32_t res;
1778         int ret;
1779
1780         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1781                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1782         if (ret != 0 || res != 0) {
1783                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1784                 return ret != 0 ? ret : res;
1785         }
1786
1787         if (outdata.dsize != sizeof(uint32_t)) {
1788                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1789                 talloc_free(outdata.dptr);
1790                 return -1;
1791         }
1792
1793         if (runstate != NULL) {
1794                 *runstate = *(uint32_t *)outdata.dptr;
1795         }
1796         talloc_free(outdata.dptr);
1797
1798         return 0;
1799 }
1800
1801 /*
1802   find the real path to a ltdb 
1803  */
1804 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1805                    const char **path)
1806 {
1807         int ret;
1808         int32_t res;
1809         TDB_DATA data;
1810
1811         data.dptr = (uint8_t *)&dbid;
1812         data.dsize = sizeof(dbid);
1813
1814         ret = ctdb_control(ctdb, destnode, 0, 
1815                            CTDB_CONTROL_GETDBPATH, 0, data, 
1816                            mem_ctx, &data, &res, &timeout, NULL);
1817         if (ret != 0 || res != 0) {
1818                 return -1;
1819         }
1820
1821         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1822         if ((*path) == NULL) {
1823                 return -1;
1824         }
1825
1826         talloc_free(data.dptr);
1827
1828         return 0;
1829 }
1830
1831 /*
1832   find the name of a db 
1833  */
1834 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1835                    const char **name)
1836 {
1837         int ret;
1838         int32_t res;
1839         TDB_DATA data;
1840
1841         data.dptr = (uint8_t *)&dbid;
1842         data.dsize = sizeof(dbid);
1843
1844         ret = ctdb_control(ctdb, destnode, 0, 
1845                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1846                            mem_ctx, &data, &res, &timeout, NULL);
1847         if (ret != 0 || res != 0) {
1848                 return -1;
1849         }
1850
1851         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1852         if ((*name) == NULL) {
1853                 return -1;
1854         }
1855
1856         talloc_free(data.dptr);
1857
1858         return 0;
1859 }
1860
1861 /*
1862   get the health status of a db
1863  */
1864 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1865                           struct timeval timeout,
1866                           uint32_t destnode,
1867                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1868                           const char **reason)
1869 {
1870         int ret;
1871         int32_t res;
1872         TDB_DATA data;
1873
1874         data.dptr = (uint8_t *)&dbid;
1875         data.dsize = sizeof(dbid);
1876
1877         ret = ctdb_control(ctdb, destnode, 0,
1878                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1879                            mem_ctx, &data, &res, &timeout, NULL);
1880         if (ret != 0 || res != 0) {
1881                 return -1;
1882         }
1883
1884         if (data.dsize == 0) {
1885                 (*reason) = NULL;
1886                 return 0;
1887         }
1888
1889         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1890         if ((*reason) == NULL) {
1891                 return -1;
1892         }
1893
1894         talloc_free(data.dptr);
1895
1896         return 0;
1897 }
1898
1899 /*
1900  * get db sequence number
1901  */
1902 int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout,
1903                           uint32_t destnode, uint32_t dbid, uint64_t *seqnum)
1904 {
1905         int ret;
1906         int32_t res;
1907         TDB_DATA data, outdata;
1908
1909         data.dptr = (uint8_t *)&dbid;
1910         data.dsize = sizeof(uint64_t);  /* This is just wrong */
1911
1912         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_SEQNUM,
1913                            0, data, ctdb, &outdata, &res, &timeout, NULL);
1914         if (ret != 0 || res != 0) {
1915                 DEBUG(DEBUG_ERR,("ctdb_control for getdbesqnum failed\n"));
1916                 return -1;
1917         }
1918
1919         if (outdata.dsize != sizeof(uint64_t)) {
1920                 DEBUG(DEBUG_ERR,("Invalid return data in get_dbseqnum\n"));
1921                 talloc_free(outdata.dptr);
1922                 return -1;
1923         }
1924
1925         if (seqnum != NULL) {
1926                 *seqnum = *(uint64_t *)outdata.dptr;
1927         }
1928         talloc_free(outdata.dptr);
1929
1930         return 0;
1931 }
1932
1933 /*
1934   create a database
1935  */
1936 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1937                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1938 {
1939         int ret;
1940         int32_t res;
1941         TDB_DATA data;
1942         uint64_t tdb_flags = 0;
1943
1944         data.dptr = discard_const(name);
1945         data.dsize = strlen(name)+1;
1946
1947         /* Make sure that volatile databases use jenkins hash */
1948         if (!persistent) {
1949                 tdb_flags = TDB_INCOMPATIBLE_HASH;
1950         }
1951
1952 #ifdef TDB_MUTEX_LOCKING
1953         if (!persistent && ctdb->tunable.mutex_enabled == 1) {
1954                 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
1955         }
1956 #endif
1957
1958         ret = ctdb_control(ctdb, destnode, tdb_flags,
1959                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1960                            0, data, 
1961                            mem_ctx, &data, &res, &timeout, NULL);
1962
1963         if (ret != 0 || res != 0) {
1964                 return -1;
1965         }
1966
1967         return 0;
1968 }
1969
1970 /*
1971   get debug level on a node
1972  */
1973 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1974 {
1975         int ret;
1976         int32_t res;
1977         TDB_DATA data;
1978
1979         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1980                            ctdb, &data, &res, NULL, NULL);
1981         if (ret != 0 || res != 0) {
1982                 return -1;
1983         }
1984         if (data.dsize != sizeof(int32_t)) {
1985                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1986                          (unsigned)data.dsize));
1987                 return -1;
1988         }
1989         *level = *(int32_t *)data.dptr;
1990         talloc_free(data.dptr);
1991         return 0;
1992 }
1993
1994 /*
1995   set debug level on a node
1996  */
1997 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1998 {
1999         int ret;
2000         int32_t res;
2001         TDB_DATA data;
2002
2003         data.dptr = (uint8_t *)&level;
2004         data.dsize = sizeof(level);
2005
2006         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
2007                            NULL, NULL, &res, NULL, NULL);
2008         if (ret != 0 || res != 0) {
2009                 return -1;
2010         }
2011         return 0;
2012 }
2013
2014
2015 /*
2016   get a list of connected nodes
2017  */
2018 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
2019                                 struct timeval timeout,
2020                                 TALLOC_CTX *mem_ctx,
2021                                 uint32_t *num_nodes)
2022 {
2023         struct ctdb_node_map_old *map=NULL;
2024         int ret, i;
2025         uint32_t *nodes;
2026
2027         *num_nodes = 0;
2028
2029         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
2030         if (ret != 0) {
2031                 return NULL;
2032         }
2033
2034         nodes = talloc_array(mem_ctx, uint32_t, map->num);
2035         if (nodes == NULL) {
2036                 return NULL;
2037         }
2038
2039         for (i=0;i<map->num;i++) {
2040                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2041                         nodes[*num_nodes] = map->nodes[i].pnn;
2042                         (*num_nodes)++;
2043                 }
2044         }
2045
2046         return nodes;
2047 }
2048
2049
2050 /*
2051   reset remote status
2052  */
2053 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
2054 {
2055         int ret;
2056         int32_t res;
2057
2058         ret = ctdb_control(ctdb, destnode, 0, 
2059                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
2060                            NULL, NULL, &res, NULL, NULL);
2061         if (ret != 0 || res != 0) {
2062                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
2063                 return -1;
2064         }
2065         return 0;
2066 }
2067
2068 /*
2069   attach to a specific database - client call
2070 */
2071 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
2072                                     struct timeval timeout,
2073                                     const char *name,
2074                                     bool persistent,
2075                                     uint32_t tdb_flags)
2076 {
2077         struct ctdb_db_context *ctdb_db;
2078         TDB_DATA data;
2079         int ret;
2080         int32_t res;
2081 #ifdef TDB_MUTEX_LOCKING
2082         uint32_t mutex_enabled = 0;
2083 #endif
2084
2085         ctdb_db = ctdb_db_handle(ctdb, name);
2086         if (ctdb_db) {
2087                 return ctdb_db;
2088         }
2089
2090         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
2091         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
2092
2093         ctdb_db->ctdb = ctdb;
2094         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
2095         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
2096
2097         data.dptr = discard_const(name);
2098         data.dsize = strlen(name)+1;
2099
2100         /* CTDB has switched to using jenkins hash for volatile databases.
2101          * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH,
2102          * always set it.
2103          */
2104         if (!persistent) {
2105                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
2106         }
2107
2108 #ifdef TDB_MUTEX_LOCKING
2109         if (!persistent) {
2110                 ret = ctdb_ctrl_get_tunable(ctdb, timeval_current_ofs(3,0),
2111                                             CTDB_CURRENT_NODE,
2112                                             "TDBMutexEnabled",
2113                                             &mutex_enabled);
2114                 if (ret != 0) {
2115                         DEBUG(DEBUG_WARNING, ("Assuming no mutex support.\n"));
2116                 }
2117
2118                 if (mutex_enabled == 1) {
2119                         tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
2120                 }
2121         }
2122 #endif
2123
2124         /* tell ctdb daemon to attach */
2125         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
2126                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
2127                            0, data, ctdb_db, &data, &res, NULL, NULL);
2128         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
2129                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
2130                 talloc_free(ctdb_db);
2131                 return NULL;
2132         }
2133         
2134         ctdb_db->db_id = *(uint32_t *)data.dptr;
2135         talloc_free(data.dptr);
2136
2137         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
2138         if (ret != 0) {
2139                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
2140                 talloc_free(ctdb_db);
2141                 return NULL;
2142         }
2143
2144         if (persistent) {
2145                 tdb_flags = TDB_DEFAULT;
2146         } else {
2147                 tdb_flags = TDB_NOSYNC;
2148 #ifdef TDB_MUTEX_LOCKING
2149                 if (mutex_enabled) {
2150                         tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
2151                 }
2152 #endif
2153         }
2154         if (ctdb->valgrinding) {
2155                 tdb_flags |= TDB_NOMMAP;
2156         }
2157         tdb_flags |= TDB_DISALLOW_NESTING;
2158
2159         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path, 0, tdb_flags,
2160                                       O_RDWR, 0);
2161         if (ctdb_db->ltdb == NULL) {
2162                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
2163                 talloc_free(ctdb_db);
2164                 return NULL;
2165         }
2166
2167         ctdb_db->persistent = persistent;
2168
2169         DLIST_ADD(ctdb->db_list, ctdb_db);
2170
2171         /* add well known functions */
2172         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2173         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2174         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2175
2176         return ctdb_db;
2177 }
2178
2179 /*
2180  * detach from a specific database - client call
2181  */
2182 int ctdb_detach(struct ctdb_context *ctdb, uint32_t db_id)
2183 {
2184         int ret;
2185         int32_t status;
2186         TDB_DATA data;
2187
2188         data.dsize = sizeof(db_id);
2189         data.dptr = (uint8_t *)&db_id;
2190
2191         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_DETACH,
2192                            0, data, NULL, NULL, &status, NULL, NULL);
2193         if (ret != 0 || status != 0) {
2194                 return -1;
2195         }
2196         return 0;
2197 }
2198
2199 /*
2200   setup a call for a database
2201  */
2202 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2203 {
2204         struct ctdb_registered_call *call;
2205
2206         /* register locally */
2207         call = talloc(ctdb_db, struct ctdb_registered_call);
2208         call->fn = fn;
2209         call->id = id;
2210
2211         DLIST_ADD(ctdb_db->calls, call);
2212         return 0;
2213 }
2214
2215
2216 struct traverse_state {
2217         bool done;
2218         uint32_t count;
2219         ctdb_traverse_func fn;
2220         void *private_data;
2221         bool listemptyrecords;
2222 };
2223
2224 /*
2225   called on each key during a ctdb_traverse
2226  */
2227 static void traverse_handler(uint64_t srvid, TDB_DATA data, void *p)
2228 {
2229         struct traverse_state *state = (struct traverse_state *)p;
2230         struct ctdb_rec_data_old *d = (struct ctdb_rec_data_old *)data.dptr;
2231         TDB_DATA key;
2232
2233         if (data.dsize < sizeof(uint32_t) || d->length != data.dsize) {
2234                 DEBUG(DEBUG_ERR, ("Bad data size %u in traverse_handler\n",
2235                                   (unsigned)data.dsize));
2236                 state->done = true;
2237                 return;
2238         }
2239
2240         key.dsize = d->keylen;
2241         key.dptr  = &d->data[0];
2242         data.dsize = d->datalen;
2243         data.dptr = &d->data[d->keylen];
2244
2245         if (key.dsize == 0 && data.dsize == 0) {
2246                 /* end of traverse */
2247                 state->done = true;
2248                 return;
2249         }
2250
2251         if (!state->listemptyrecords &&
2252             data.dsize == sizeof(struct ctdb_ltdb_header))
2253         {
2254                 /* empty records are deleted records in ctdb */
2255                 return;
2256         }
2257
2258         if (state->fn(key, data, state->private_data) != 0) {
2259                 state->done = true;
2260         }
2261
2262         state->count++;
2263 }
2264
2265 /**
2266  * start a cluster wide traverse, calling the supplied fn on each record
2267  * return the number of records traversed, or -1 on error
2268  *
2269  * Extendet variant with a flag to signal whether empty records should
2270  * be listed.
2271  */
2272 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2273                              ctdb_traverse_func fn,
2274                              bool withemptyrecords,
2275                              void *private_data)
2276 {
2277         TDB_DATA data;
2278         struct ctdb_traverse_start_ext t;
2279         int32_t status;
2280         int ret;
2281         uint64_t srvid = (getpid() | 0xFLL<<60);
2282         struct traverse_state state;
2283
2284         state.done = false;
2285         state.count = 0;
2286         state.private_data = private_data;
2287         state.fn = fn;
2288         state.listemptyrecords = withemptyrecords;
2289
2290         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2291         if (ret != 0) {
2292                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2293                 return -1;
2294         }
2295
2296         t.db_id = ctdb_db->db_id;
2297         t.srvid = srvid;
2298         t.reqid = 0;
2299         t.withemptyrecords = withemptyrecords;
2300
2301         data.dptr = (uint8_t *)&t;
2302         data.dsize = sizeof(t);
2303
2304         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2305                            data, NULL, NULL, &status, NULL, NULL);
2306         if (ret != 0 || status != 0) {
2307                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2308                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2309                 return -1;
2310         }
2311
2312         while (!state.done) {
2313                 tevent_loop_once(ctdb_db->ctdb->ev);
2314         }
2315
2316         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2317         if (ret != 0) {
2318                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2319                 return -1;
2320         }
2321
2322         return state.count;
2323 }
2324
2325 /**
2326  * start a cluster wide traverse, calling the supplied fn on each record
2327  * return the number of records traversed, or -1 on error
2328  *
2329  * Standard version which does not list the empty records:
2330  * These are considered deleted.
2331  */
2332 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2333 {
2334         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2335 }
2336
2337 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2338 /*
2339   called on each key during a catdb
2340  */
2341 int ctdb_dumpdb_record(TDB_DATA key, TDB_DATA data, void *p)
2342 {
2343         int i;
2344         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2345         FILE *f = c->f;
2346         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2347
2348         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2349         for (i=0;i<key.dsize;i++) {
2350                 if (ISASCII(key.dptr[i])) {
2351                         fprintf(f, "%c", key.dptr[i]);
2352                 } else {
2353                         fprintf(f, "\\%02X", key.dptr[i]);
2354                 }
2355         }
2356         fprintf(f, "\"\n");
2357
2358         fprintf(f, "dmaster: %u\n", h->dmaster);
2359         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2360
2361         if (c->printlmaster && c->ctdb->vnn_map != NULL) {
2362                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(c->ctdb, &key));
2363         }
2364
2365         if (c->printhash) {
2366                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2367         }
2368
2369         if (c->printrecordflags) {
2370                 fprintf(f, "flags: 0x%08x", h->flags);
2371                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2372                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2373                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2374                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2375                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2376                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2377                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2378                 fprintf(f, "\n");
2379         }
2380
2381         if (c->printdatasize) {
2382                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2383         } else {
2384                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2385                 for (i=sizeof(*h);i<data.dsize;i++) {
2386                         if (ISASCII(data.dptr[i])) {
2387                                 fprintf(f, "%c", data.dptr[i]);
2388                         } else {
2389                                 fprintf(f, "\\%02X", data.dptr[i]);
2390                         }
2391                 }
2392                 fprintf(f, "\"\n");
2393         }
2394
2395         fprintf(f, "\n");
2396
2397         return 0;
2398 }
2399
2400 /*
2401   convenience function to list all keys to stdout
2402  */
2403 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2404                  struct ctdb_dump_db_context *ctx)
2405 {
2406         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2407                                  ctx->printemptyrecords, ctx);
2408 }
2409
2410 /*
2411   get the pid of a ctdb daemon
2412  */
2413 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2414 {
2415         int ret;
2416         int32_t res;
2417
2418         ret = ctdb_control(ctdb, destnode, 0, 
2419                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2420                            NULL, NULL, &res, &timeout, NULL);
2421         if (ret != 0) {
2422                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2423                 return -1;
2424         }
2425
2426         *pid = res;
2427
2428         return 0;
2429 }
2430
2431
2432 /*
2433   async freeze send control
2434  */
2435 struct ctdb_client_control_state *
2436 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2437 {
2438         return ctdb_control_send(ctdb, destnode, priority, 
2439                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2440                            mem_ctx, &timeout, NULL);
2441 }
2442
2443 /* 
2444    async freeze recv control
2445 */
2446 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2447 {
2448         int ret;
2449         int32_t res;
2450
2451         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2452         if ( (ret != 0) || (res != 0) ){
2453                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2454                 return -1;
2455         }
2456
2457         return 0;
2458 }
2459
2460 /*
2461   freeze databases of a certain priority
2462  */
2463 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2464 {
2465         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2466         struct ctdb_client_control_state *state;
2467         int ret;
2468
2469         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2470         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2471         talloc_free(tmp_ctx);
2472
2473         return ret;
2474 }
2475
2476 /* Freeze all databases */
2477 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2478 {
2479         int i;
2480
2481         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2482                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2483                         return -1;
2484                 }
2485         }
2486         return 0;
2487 }
2488
2489 /*
2490   thaw databases of a certain priority
2491  */
2492 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2493 {
2494         int ret;
2495         int32_t res;
2496
2497         ret = ctdb_control(ctdb, destnode, priority, 
2498                            CTDB_CONTROL_THAW, 0, tdb_null, 
2499                            NULL, NULL, &res, &timeout, NULL);
2500         if (ret != 0 || res != 0) {
2501                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2502                 return -1;
2503         }
2504
2505         return 0;
2506 }
2507
2508 /* thaw all databases */
2509 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2510 {
2511         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2512 }
2513
2514 /*
2515   get pnn of a node, or -1
2516  */
2517 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2518 {
2519         int ret;
2520         int32_t res;
2521
2522         ret = ctdb_control(ctdb, destnode, 0, 
2523                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2524                            NULL, NULL, &res, &timeout, NULL);
2525         if (ret != 0) {
2526                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2527                 return -1;
2528         }
2529
2530         return res;
2531 }
2532
2533 /*
2534   get the monitoring mode of a remote node
2535  */
2536 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2537 {
2538         int ret;
2539         int32_t res;
2540
2541         ret = ctdb_control(ctdb, destnode, 0, 
2542                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2543                            NULL, NULL, &res, &timeout, NULL);
2544         if (ret != 0) {
2545                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2546                 return -1;
2547         }
2548
2549         *monmode = res;
2550
2551         return 0;
2552 }
2553
2554
2555 /*
2556  set the monitoring mode of a remote node to active
2557  */
2558 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2559 {
2560         int ret;
2561         
2562
2563         ret = ctdb_control(ctdb, destnode, 0, 
2564                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2565                            NULL, NULL,NULL, &timeout, NULL);
2566         if (ret != 0) {
2567                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2568                 return -1;
2569         }
2570
2571         
2572
2573         return 0;
2574 }
2575
2576 /*
2577   set the monitoring mode of a remote node to disable
2578  */
2579 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2580 {
2581         int ret;
2582         
2583
2584         ret = ctdb_control(ctdb, destnode, 0, 
2585                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2586                            NULL, NULL, NULL, &timeout, NULL);
2587         if (ret != 0) {
2588                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2589                 return -1;
2590         }
2591
2592         
2593
2594         return 0;
2595 }
2596
2597
2598
2599 /*
2600   sent to a node to make it take over an ip address
2601 */
2602 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout,
2603                           uint32_t destnode, struct ctdb_public_ip *ip)
2604 {
2605         TDB_DATA data;
2606         int ret;
2607         int32_t res;
2608
2609         data.dsize = sizeof(*ip);
2610         data.dptr  = (uint8_t *)ip;
2611
2612         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0,
2613                            data, NULL, NULL, &res, &timeout, NULL);
2614         if (ret != 0 || res != 0) {
2615                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2616                 return -1;
2617         }
2618
2619         return 0;
2620 }
2621
2622
2623 /*
2624   sent to a node to make it release an ip address
2625 */
2626 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout,
2627                          uint32_t destnode, struct ctdb_public_ip *ip)
2628 {
2629         TDB_DATA data;
2630         int ret;
2631         int32_t res;
2632
2633         data.dsize = sizeof(*ip);
2634         data.dptr  = (uint8_t *)ip;
2635
2636         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0,
2637                            data, NULL, NULL, &res, &timeout, NULL);
2638         if (ret != 0 || res != 0) {
2639                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2640                 return -1;
2641         }
2642
2643         return 0;
2644 }
2645
2646
2647 /*
2648   get a tunable
2649  */
2650 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2651                           struct timeval timeout, 
2652                           uint32_t destnode,
2653                           const char *name, uint32_t *value)
2654 {
2655         struct ctdb_control_get_tunable *t;
2656         TDB_DATA data, outdata;
2657         int32_t res;
2658         int ret;
2659
2660         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2661         data.dptr  = talloc_size(ctdb, data.dsize);
2662         CTDB_NO_MEMORY(ctdb, data.dptr);
2663
2664         t = (struct ctdb_control_get_tunable *)data.dptr;
2665         t->length = strlen(name)+1;
2666         memcpy(t->name, name, t->length);
2667
2668         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2669                            &outdata, &res, &timeout, NULL);
2670         talloc_free(data.dptr);
2671         if (ret != 0 || res != 0) {
2672                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2673                 return ret != 0 ? ret : res;
2674         }
2675
2676         if (outdata.dsize != sizeof(uint32_t)) {
2677                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2678                 talloc_free(outdata.dptr);
2679                 return -1;
2680         }
2681         
2682         *value = *(uint32_t *)outdata.dptr;
2683         talloc_free(outdata.dptr);
2684
2685         return 0;
2686 }
2687
2688 /*
2689   set a tunable
2690  */
2691 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2692                           struct timeval timeout, 
2693                           uint32_t destnode,
2694                           const char *name, uint32_t value)
2695 {
2696         struct ctdb_tunable_old *t;
2697         TDB_DATA data;
2698         int32_t res;
2699         int ret;
2700
2701         data.dsize = offsetof(struct ctdb_tunable_old, name) + strlen(name) + 1;
2702         data.dptr  = talloc_size(ctdb, data.dsize);
2703         CTDB_NO_MEMORY(ctdb, data.dptr);
2704
2705         t = (struct ctdb_tunable_old *)data.dptr;
2706         t->length = strlen(name)+1;
2707         memcpy(t->name, name, t->length);
2708         t->value = value;
2709
2710         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2711                            NULL, &res, &timeout, NULL);
2712         talloc_free(data.dptr);
2713         if ((ret != 0) || (res == -1)) {
2714                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2715                 return -1;
2716         }
2717
2718         return res;
2719 }
2720
2721 /*
2722   list tunables
2723  */
2724 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2725                             struct timeval timeout, 
2726                             uint32_t destnode,
2727                             TALLOC_CTX *mem_ctx,
2728                             const char ***list, uint32_t *count)
2729 {
2730         TDB_DATA outdata;
2731         int32_t res;
2732         int ret;
2733         struct ctdb_control_list_tunable *t;
2734         char *p, *s, *ptr;
2735
2736         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2737                            mem_ctx, &outdata, &res, &timeout, NULL);
2738         if (ret != 0 || res != 0) {
2739                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2740                 return -1;
2741         }
2742
2743         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2744         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2745             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2746                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2747                 talloc_free(outdata.dptr);
2748                 return -1;              
2749         }
2750         
2751         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2752         CTDB_NO_MEMORY(ctdb, p);
2753
2754         talloc_free(outdata.dptr);
2755         
2756         (*list) = NULL;
2757         (*count) = 0;
2758
2759         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2760                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2761                 CTDB_NO_MEMORY(ctdb, *list);
2762                 (*list)[*count] = talloc_strdup(*list, s);
2763                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2764                 (*count)++;
2765         }
2766
2767         talloc_free(p);
2768
2769         return 0;
2770 }
2771
2772
2773 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2774                                    struct timeval timeout, uint32_t destnode,
2775                                    TALLOC_CTX *mem_ctx,
2776                                    uint32_t flags,
2777                                    struct ctdb_public_ip_list_old **ips)
2778 {
2779         int ret;
2780         TDB_DATA outdata;
2781         int32_t res;
2782
2783         ret = ctdb_control(ctdb, destnode, 0,
2784                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2785                            mem_ctx, &outdata, &res, &timeout, NULL);
2786         if (ret != 0 || res != 0) {
2787                 DEBUG(DEBUG_ERR,(__location__
2788                                  " ctdb_control for getpublicips failed ret:%d res:%d\n",
2789                                  ret, res));
2790                 return -1;
2791         }
2792
2793         *ips = (struct ctdb_public_ip_list_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2794         talloc_free(outdata.dptr);
2795
2796         return 0;
2797 }
2798
2799 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2800                              struct timeval timeout, uint32_t destnode,
2801                              TALLOC_CTX *mem_ctx,
2802                              struct ctdb_public_ip_list_old **ips)
2803 {
2804         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2805                                               destnode, mem_ctx,
2806                                               0, ips);
2807 }
2808
2809 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2810                                  struct timeval timeout, uint32_t destnode,
2811                                  TALLOC_CTX *mem_ctx,
2812                                  const ctdb_sock_addr *addr,
2813                                  struct ctdb_control_public_ip_info **_info)
2814 {
2815         int ret;
2816         TDB_DATA indata;
2817         TDB_DATA outdata;
2818         int32_t res;
2819         struct ctdb_control_public_ip_info *info;
2820         uint32_t len;
2821         uint32_t i;
2822
2823         indata.dptr = discard_const_p(uint8_t, addr);
2824         indata.dsize = sizeof(*addr);
2825
2826         ret = ctdb_control(ctdb, destnode, 0,
2827                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2828                            mem_ctx, &outdata, &res, &timeout, NULL);
2829         if (ret != 0 || res != 0) {
2830                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2831                                 "failed ret:%d res:%d\n",
2832                                 ret, res));
2833                 return -1;
2834         }
2835
2836         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2837         if (len > outdata.dsize) {
2838                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2839                                 "returned invalid data with size %u > %u\n",
2840                                 (unsigned int)outdata.dsize,
2841                                 (unsigned int)len));
2842                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2843                 return -1;
2844         }
2845
2846         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2847         len += info->num*sizeof(struct ctdb_iface);
2848
2849         if (len > outdata.dsize) {
2850                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2851                                 "returned invalid data with size %u > %u\n",
2852                                 (unsigned int)outdata.dsize,
2853                                 (unsigned int)len));
2854                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2855                 return -1;
2856         }
2857
2858         /* make sure we null terminate the returned strings */
2859         for (i=0; i < info->num; i++) {
2860                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2861         }
2862
2863         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2864                                                                 outdata.dptr,
2865                                                                 outdata.dsize);
2866         talloc_free(outdata.dptr);
2867         if (*_info == NULL) {
2868                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2869                                 "talloc_memdup size %u failed\n",
2870                                 (unsigned int)outdata.dsize));
2871                 return -1;
2872         }
2873
2874         return 0;
2875 }
2876
2877 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2878                          struct timeval timeout, uint32_t destnode,
2879                          TALLOC_CTX *mem_ctx,
2880                          struct ctdb_control_get_ifaces **_ifaces)
2881 {
2882         int ret;
2883         TDB_DATA outdata;
2884         int32_t res;
2885         struct ctdb_control_get_ifaces *ifaces;
2886         uint32_t len;
2887         uint32_t i;
2888
2889         ret = ctdb_control(ctdb, destnode, 0,
2890                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2891                            mem_ctx, &outdata, &res, &timeout, NULL);
2892         if (ret != 0 || res != 0) {
2893                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2894                                 "failed ret:%d res:%d\n",
2895                                 ret, res));
2896                 return -1;
2897         }
2898
2899         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2900         if (len > outdata.dsize) {
2901                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2902                                 "returned invalid data with size %u > %u\n",
2903                                 (unsigned int)outdata.dsize,
2904                                 (unsigned int)len));
2905                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2906                 return -1;
2907         }
2908
2909         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2910         len += ifaces->num*sizeof(struct ctdb_iface);
2911
2912         if (len > outdata.dsize) {
2913                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2914                                 "returned invalid data with size %u > %u\n",
2915                                 (unsigned int)outdata.dsize,
2916                                 (unsigned int)len));
2917                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2918                 return -1;
2919         }
2920
2921         /* make sure we null terminate the returned strings */
2922         for (i=0; i < ifaces->num; i++) {
2923                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2924         }
2925
2926         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2927                                                                   outdata.dptr,
2928                                                                   outdata.dsize);
2929         talloc_free(outdata.dptr);
2930         if (*_ifaces == NULL) {
2931                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2932                                 "talloc_memdup size %u failed\n",
2933                                 (unsigned int)outdata.dsize));
2934                 return -1;
2935         }
2936
2937         return 0;
2938 }
2939
2940 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2941                              struct timeval timeout, uint32_t destnode,
2942                              TALLOC_CTX *mem_ctx,
2943                              const struct ctdb_iface *info)
2944 {
2945         int ret;
2946         TDB_DATA indata;
2947         int32_t res;
2948
2949         indata.dptr = discard_const_p(uint8_t, info);
2950         indata.dsize = sizeof(*info);
2951
2952         ret = ctdb_control(ctdb, destnode, 0,
2953                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2954                            mem_ctx, NULL, &res, &timeout, NULL);
2955         if (ret != 0 || res != 0) {
2956                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2957                                 "failed ret:%d res:%d\n",
2958                                 ret, res));
2959                 return -1;
2960         }
2961
2962         return 0;
2963 }
2964
2965 /*
2966   set/clear the permanent disabled bit on a remote node
2967  */
2968 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2969                        uint32_t set, uint32_t clear)
2970 {
2971         int ret;
2972         TDB_DATA data;
2973         struct ctdb_node_map_old *nodemap=NULL;
2974         struct ctdb_node_flag_change c;
2975         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2976         uint32_t recmaster;
2977         uint32_t *nodes;
2978
2979
2980         /* find the recovery master */
2981         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2982         if (ret != 0) {
2983                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2984                 talloc_free(tmp_ctx);
2985                 return ret;
2986         }
2987
2988
2989         /* read the node flags from the recmaster */
2990         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2991         if (ret != 0) {
2992                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2993                 talloc_free(tmp_ctx);
2994                 return -1;
2995         }
2996         if (destnode >= nodemap->num) {
2997                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2998                 talloc_free(tmp_ctx);
2999                 return -1;
3000         }
3001
3002         c.pnn       = destnode;
3003         c.old_flags = nodemap->nodes[destnode].flags;
3004         c.new_flags = c.old_flags;
3005         c.new_flags |= set;
3006         c.new_flags &= ~clear;
3007
3008         data.dsize = sizeof(c);
3009         data.dptr = (unsigned char *)&c;
3010
3011         /* send the flags update to all connected nodes */
3012         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
3013
3014         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
3015                                         nodes, 0,
3016                                         timeout, false, data,
3017                                         NULL, NULL,
3018                                         NULL) != 0) {
3019                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
3020
3021                 talloc_free(tmp_ctx);
3022                 return -1;
3023         }
3024
3025         talloc_free(tmp_ctx);
3026         return 0;
3027 }
3028
3029
3030 /*
3031   get all tunables
3032  */
3033 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb,
3034                                struct timeval timeout,
3035                                uint32_t destnode,
3036                                struct ctdb_tunable_list *tunables)
3037 {
3038         TDB_DATA outdata;
3039         int ret;
3040         int32_t res;
3041
3042         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
3043                            &outdata, &res, &timeout, NULL);
3044         if (ret != 0 || res != 0) {
3045                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
3046                 return -1;
3047         }
3048
3049         if (outdata.dsize != sizeof(*tunables)) {
3050                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
3051                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
3052                 return -1;
3053         }
3054
3055         *tunables = *(struct ctdb_tunable_list *)outdata.dptr;
3056         talloc_free(outdata.dptr);
3057         return 0;
3058 }
3059
3060 /*
3061   add a public address to a node
3062  */
3063 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb,
3064                             struct timeval timeout, uint32_t destnode,
3065                             struct ctdb_addr_info_old *pub)
3066 {
3067         TDB_DATA data;
3068         int32_t res;
3069         int ret;
3070
3071         data.dsize = offsetof(struct ctdb_addr_info_old, iface) + pub->len;
3072         data.dptr  = (unsigned char *)pub;
3073
3074         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
3075                            NULL, &res, &timeout, NULL);
3076         if (ret != 0 || res != 0) {
3077                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
3078                 return -1;
3079         }
3080
3081         return 0;
3082 }
3083
3084 /*
3085   delete a public address from a node
3086  */
3087 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb,
3088                             struct timeval timeout, uint32_t destnode,
3089                             struct ctdb_addr_info_old *pub)
3090 {
3091         TDB_DATA data;
3092         int32_t res;
3093         int ret;
3094
3095         data.dsize = offsetof(struct ctdb_addr_info_old, iface) + pub->len;
3096         data.dptr  = (unsigned char *)pub;
3097
3098         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
3099                            NULL, &res, &timeout, NULL);
3100         if (ret != 0 || res != 0) {
3101                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
3102                 return -1;
3103         }
3104
3105         return 0;
3106 }
3107
3108 /*
3109   kill a tcp connection
3110  */
3111 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
3112                       struct timeval timeout, 
3113                       uint32_t destnode,
3114                       struct ctdb_connection *killtcp)
3115 {
3116         TDB_DATA data;
3117         int32_t res;
3118         int ret;
3119
3120         data.dsize = sizeof(struct ctdb_connection);
3121         data.dptr  = (unsigned char *)killtcp;
3122
3123         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
3124                            NULL, &res, &timeout, NULL);
3125         if (ret != 0 || res != 0) {
3126                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
3127                 return -1;
3128         }
3129
3130         return 0;
3131 }
3132
3133 /*
3134   send a gratious arp
3135  */
3136 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb,
3137                            struct timeval timeout, uint32_t destnode,
3138                            ctdb_sock_addr *addr, const char *ifname)
3139 {
3140         TDB_DATA data;
3141         int32_t res;
3142         int ret, len;
3143         struct ctdb_addr_info_old *gratious_arp;
3144         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3145
3146
3147         len = strlen(ifname)+1;
3148         gratious_arp = talloc_size(tmp_ctx,
3149                 offsetof(struct ctdb_addr_info_old, iface) + len);
3150         CTDB_NO_MEMORY(ctdb, gratious_arp);
3151
3152         gratious_arp->addr = *addr;
3153         gratious_arp->len = len;
3154         memcpy(&gratious_arp->iface[0], ifname, len);
3155
3156
3157         data.dsize = offsetof(struct ctdb_addr_info_old, iface) + len;
3158         data.dptr  = (unsigned char *)gratious_arp;
3159
3160         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3161                            NULL, &res, &timeout, NULL);
3162         if (ret != 0 || res != 0) {
3163                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3164                 talloc_free(tmp_ctx);
3165                 return -1;
3166         }
3167
3168         talloc_free(tmp_ctx);
3169         return 0;
3170 }
3171
3172 /*
3173   get a list of all tcp tickles that a node knows about for a particular vnn
3174  */
3175 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3176                               struct timeval timeout, uint32_t destnode, 
3177                               TALLOC_CTX *mem_ctx, 
3178                               ctdb_sock_addr *addr,
3179                               struct ctdb_tickle_list_old **list)
3180 {
3181         int ret;
3182         TDB_DATA data, outdata;
3183         int32_t status;
3184
3185         data.dptr = (uint8_t*)addr;
3186         data.dsize = sizeof(ctdb_sock_addr);
3187
3188         ret = ctdb_control(ctdb, destnode, 0, 
3189                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3190                            mem_ctx, &outdata, &status, NULL, NULL);
3191         if (ret != 0 || status != 0) {
3192                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3193                 return -1;
3194         }
3195
3196         *list = (struct ctdb_tickle_list_old *)outdata.dptr;
3197
3198         return status;
3199 }
3200
3201 /*
3202   register a server id
3203  */
3204 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb,
3205                                  struct timeval timeout,
3206                                  struct ctdb_client_id *id)
3207 {
3208         TDB_DATA data;
3209         int32_t res;
3210         int ret;
3211
3212         data.dsize = sizeof(struct ctdb_client_id);
3213         data.dptr  = (unsigned char *)id;
3214
3215         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3216                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3217                         0, data, NULL,
3218                         NULL, &res, &timeout, NULL);
3219         if (ret != 0 || res != 0) {
3220                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3221                 return -1;
3222         }
3223
3224         return 0;
3225 }
3226
3227 /*
3228   unregister a server id
3229  */
3230 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb,
3231                                    struct timeval timeout,
3232                                    struct ctdb_client_id *id)
3233 {
3234         TDB_DATA data;
3235         int32_t res;
3236         int ret;
3237
3238         data.dsize = sizeof(struct ctdb_client_id);
3239         data.dptr  = (unsigned char *)id;
3240
3241         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3242                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3243                         0, data, NULL,
3244                         NULL, &res, &timeout, NULL);
3245         if (ret != 0 || res != 0) {
3246                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3247                 return -1;
3248         }
3249
3250         return 0;
3251 }
3252
3253
3254 /*
3255   check if a server id exists
3256
3257   if a server id does exist, return *status == 1, otherwise *status == 0
3258  */
3259 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb,
3260                               struct timeval timeout, uint32_t destnode,
3261                               struct ctdb_client_id *id, uint32_t *status)
3262 {
3263         TDB_DATA data;
3264         int32_t res;
3265         int ret;
3266
3267         data.dsize = sizeof(struct ctdb_client_id);
3268         data.dptr  = (unsigned char *)id;
3269
3270         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3271                         0, data, NULL,
3272                         NULL, &res, &timeout, NULL);
3273         if (ret != 0) {
3274                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3275                 return -1;
3276         }
3277
3278         if (res) {
3279                 *status = 1;
3280         } else {
3281                 *status = 0;
3282         }
3283
3284         return 0;
3285 }
3286
3287 /*
3288    get the list of server ids that are registered on a node
3289 */
3290 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3291                                  TALLOC_CTX *mem_ctx,
3292                                  struct timeval timeout, uint32_t destnode,
3293                                  struct ctdb_client_id_list_old **svid_list)
3294 {
3295         int ret;
3296         TDB_DATA outdata;
3297         int32_t res;
3298
3299         ret = ctdb_control(ctdb, destnode, 0, 
3300                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3301                            mem_ctx, &outdata, &res, &timeout, NULL);
3302         if (ret != 0 || res != 0) {
3303                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3304                 return -1;
3305         }
3306
3307         *svid_list = (struct ctdb_client_id_list_old *)talloc_steal(mem_ctx, outdata.dptr);
3308
3309         return 0;
3310 }
3311
3312 /*
3313   initialise the ctdb daemon for client applications
3314
3315   NOTE: In current code the daemon does not fork. This is for testing purposes only
3316   and to simplify the code.
3317 */
3318 struct ctdb_context *ctdb_init(struct tevent_context *ev)
3319 {
3320         int ret;
3321         struct ctdb_context *ctdb;
3322
3323         ctdb = talloc_zero(ev, struct ctdb_context);
3324         if (ctdb == NULL) {
3325                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3326                 return NULL;
3327         }
3328         ctdb->ev  = ev;
3329         /* Wrap early to exercise code. */
3330         ret = reqid_init(ctdb, INT_MAX-200, &ctdb->idr);
3331         if (ret != 0) {
3332                 DEBUG(DEBUG_ERR, ("reqid_init failed (%s)\n", strerror(ret)));
3333                 talloc_free(ctdb);
3334                 return NULL;
3335         }
3336
3337         ret = srvid_init(ctdb, &ctdb->srv);
3338         if (ret != 0) {
3339                 DEBUG(DEBUG_ERR, ("srvid_init failed (%s)\n", strerror(ret)));
3340                 talloc_free(ctdb);
3341                 return NULL;
3342         }
3343
3344         ret = ctdb_set_socketname(ctdb, CTDB_SOCKET);
3345         if (ret != 0) {
3346                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3347                 talloc_free(ctdb);
3348                 return NULL;
3349         }
3350
3351         ctdb->statistics.statistics_start_time = timeval_current();
3352
3353         return ctdb;
3354 }
3355
3356
3357 /*
3358   set some ctdb flags
3359 */
3360 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3361 {
3362         ctdb->flags |= flags;
3363 }
3364
3365 /*
3366   setup the local socket name
3367 */
3368 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3369 {
3370         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3371         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3372
3373         return 0;
3374 }
3375
3376 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3377 {
3378         return ctdb->daemon.name;
3379 }
3380
3381 /*
3382   return the pnn of this node
3383 */
3384 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3385 {
3386         return ctdb->pnn;
3387 }
3388
3389
3390 /*
3391   get the uptime of a remote node
3392  */
3393 struct ctdb_client_control_state *
3394 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3395 {
3396         return ctdb_control_send(ctdb, destnode, 0, 
3397                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3398                            mem_ctx, &timeout, NULL);
3399 }
3400
3401 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3402 {
3403         int ret;
3404         int32_t res;
3405         TDB_DATA outdata;
3406
3407         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3408         if (ret != 0 || res != 0) {
3409                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3410                 return -1;
3411         }
3412
3413         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3414
3415         return 0;
3416 }
3417
3418 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3419 {
3420         struct ctdb_client_control_state *state;
3421
3422         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3423         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3424 }
3425
3426 /*
3427   send a control to execute the "recovered" event script on a node
3428  */
3429 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3430 {
3431         int ret;
3432         int32_t status;
3433
3434         ret = ctdb_control(ctdb, destnode, 0, 
3435                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3436                            NULL, NULL, &status, &timeout, NULL);
3437         if (ret != 0 || status != 0) {
3438                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3439                 return -1;
3440         }
3441
3442         return 0;
3443 }
3444
3445 /* 
3446   callback for the async helpers used when sending the same control
3447   to multiple nodes in parallell.
3448 */
3449 static void async_callback(struct ctdb_client_control_state *state)
3450 {
3451         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3452         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3453         int ret;
3454         TDB_DATA outdata;
3455         int32_t res = -1;
3456         uint32_t destnode = state->c->hdr.destnode;
3457
3458         outdata.dsize = 0;
3459         outdata.dptr = NULL;
3460
3461         /* one more node has responded with recmode data */
3462         data->count--;
3463
3464         /* if we failed to push the db, then return an error and let
3465            the main loop try again.
3466         */
3467         if (state->state != CTDB_CONTROL_DONE) {
3468                 if ( !data->dont_log_errors) {
3469                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3470                 }
3471                 data->fail_count++;
3472                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3473                         res = -ETIME;
3474                 } else {
3475                         res = -1;
3476                 }
3477                 if (data->fail_callback) {
3478                         data->fail_callback(ctdb, destnode, res, outdata,
3479                                         data->callback_data);
3480                 }
3481                 return;
3482         }
3483         
3484         state->async.fn = NULL;
3485
3486         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3487         if ((ret != 0) || (res != 0)) {
3488                 if ( !data->dont_log_errors) {
3489                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3490                 }
3491                 data->fail_count++;
3492                 if (data->fail_callback) {
3493                         data->fail_callback(ctdb, destnode, res, outdata,
3494                                         data->callback_data);
3495                 }
3496         }
3497         if ((ret == 0) && (data->callback != NULL)) {
3498                 data->callback(ctdb, destnode, res, outdata,
3499                                         data->callback_data);
3500         }
3501 }
3502
3503
3504 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3505 {
3506         /* set up the callback functions */
3507         state->async.fn = async_callback;
3508         state->async.private_data = data;
3509         
3510         /* one more control to wait for to complete */
3511         data->count++;
3512 }
3513
3514
3515 /* wait for up to the maximum number of seconds allowed
3516    or until all nodes we expect a response from has replied
3517 */
3518 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3519 {
3520         while (data->count > 0) {
3521                 tevent_loop_once(ctdb->ev);
3522         }
3523         if (data->fail_count != 0) {
3524                 if (!data->dont_log_errors) {
3525                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3526                                  data->fail_count));
3527                 }
3528                 return -1;
3529         }
3530         return 0;
3531 }
3532
3533
3534 /* 
3535    perform a simple control on the listed nodes
3536    The control cannot return data
3537  */
3538 int ctdb_client_async_control(struct ctdb_context *ctdb,
3539                                 enum ctdb_controls opcode,
3540                                 uint32_t *nodes,
3541                                 uint64_t srvid,
3542                                 struct timeval timeout,
3543                                 bool dont_log_errors,
3544                                 TDB_DATA data,
3545                                 client_async_callback client_callback,
3546                                 client_async_callback fail_callback,
3547                                 void *callback_data)
3548 {
3549         struct client_async_data *async_data;
3550         struct ctdb_client_control_state *state;
3551         int j, num_nodes;
3552
3553         async_data = talloc_zero(ctdb, struct client_async_data);
3554         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3555         async_data->dont_log_errors = dont_log_errors;
3556         async_data->callback = client_callback;
3557         async_data->fail_callback = fail_callback;
3558         async_data->callback_data = callback_data;
3559         async_data->opcode        = opcode;
3560
3561         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3562
3563         /* loop over all nodes and send an async control to each of them */
3564         for (j=0; j<num_nodes; j++) {
3565                 uint32_t pnn = nodes[j];
3566
3567                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3568                                           0, data, async_data, &timeout, NULL);
3569                 if (state == NULL) {
3570                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3571                         talloc_free(async_data);
3572                         return -1;
3573                 }
3574                 
3575                 ctdb_client_async_add(async_data, state);
3576         }
3577
3578         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3579                 talloc_free(async_data);
3580                 return -1;
3581         }
3582
3583         talloc_free(async_data);
3584         return 0;
3585 }
3586
3587 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3588                                 struct ctdb_vnn_map *vnn_map,
3589                                 TALLOC_CTX *mem_ctx,
3590                                 bool include_self)
3591 {
3592         int i, j, num_nodes;
3593         uint32_t *nodes;
3594
3595         for (i=num_nodes=0;i<vnn_map->size;i++) {
3596                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3597                         continue;
3598                 }
3599                 num_nodes++;
3600         } 
3601
3602         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3603         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3604
3605         for (i=j=0;i<vnn_map->size;i++) {
3606                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3607                         continue;
3608                 }
3609                 nodes[j++] = vnn_map->map[i];
3610         } 
3611
3612         return nodes;
3613 }
3614
3615 /* Get list of nodes not including those with flags specified by mask.
3616  * If exclude_pnn is not -1 then exclude that pnn from the list.
3617  */
3618 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3619                         struct ctdb_node_map_old *node_map,
3620                         TALLOC_CTX *mem_ctx,
3621                         uint32_t mask,
3622                         int exclude_pnn)
3623 {
3624         int i, j, num_nodes;
3625         uint32_t *nodes;
3626
3627         for (i=num_nodes=0;i<node_map->num;i++) {
3628                 if (node_map->nodes[i].flags & mask) {
3629                         continue;
3630                 }
3631                 if (node_map->nodes[i].pnn == exclude_pnn) {
3632                         continue;
3633                 }
3634                 num_nodes++;
3635         } 
3636
3637         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3638         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3639
3640         for (i=j=0;i<node_map->num;i++) {
3641                 if (node_map->nodes[i].flags & mask) {
3642                         continue;
3643                 }
3644                 if (node_map->nodes[i].pnn == exclude_pnn) {
3645                         continue;
3646                 }
3647                 nodes[j++] = node_map->nodes[i].pnn;
3648         } 
3649
3650         return nodes;
3651 }
3652
3653 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3654                                 struct ctdb_node_map_old *node_map,
3655                                 TALLOC_CTX *mem_ctx,
3656                                 bool include_self)
3657 {
3658         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3659                              include_self ? -1 : ctdb->pnn);
3660 }
3661
3662 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3663                                 struct ctdb_node_map_old *node_map,
3664                                 TALLOC_CTX *mem_ctx,
3665                                 bool include_self)
3666 {
3667         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3668                              include_self ? -1 : ctdb->pnn);
3669 }
3670
3671 /* 
3672   this is used to test if a pnn lock exists and if it exists will return
3673   the number of connections that pnn has reported or -1 if that recovery
3674   daemon is not running.
3675 */
3676 int
3677 ctdb_read_pnn_lock(int fd, int32_t pnn)
3678 {
3679         struct flock lock;
3680         char c;
3681
3682         lock.l_type = F_WRLCK;
3683         lock.l_whence = SEEK_SET;
3684         lock.l_start = pnn;
3685         lock.l_len = 1;
3686         lock.l_pid = 0;
3687
3688         if (fcntl(fd, F_GETLK, &lock) != 0) {
3689                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3690                 return -1;
3691         }
3692
3693         if (lock.l_type == F_UNLCK) {
3694                 return -1;
3695         }
3696
3697         if (pread(fd, &c, 1, pnn) == -1) {
3698                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3699                 return -1;
3700         }
3701
3702         return c;
3703 }
3704
3705 /*
3706   get capabilities of a remote node
3707  */
3708 struct ctdb_client_control_state *
3709 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3710 {
3711         return ctdb_control_send(ctdb, destnode, 0, 
3712                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3713                            mem_ctx, &timeout, NULL);
3714 }
3715
3716 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3717 {
3718         int ret;
3719         int32_t res;
3720         TDB_DATA outdata;
3721
3722         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3723         if ( (ret != 0) || (res != 0) ) {
3724                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3725                 return -1;
3726         }
3727
3728         if (capabilities) {
3729                 *capabilities = *((uint32_t *)outdata.dptr);
3730         }
3731
3732         return 0;
3733 }
3734
3735 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3736 {
3737         struct ctdb_client_control_state *state;
3738         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3739         int ret;
3740
3741         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3742         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3743         talloc_free(tmp_ctx);
3744         return ret;
3745 }
3746
3747 static void get_capabilities_callback(struct ctdb_context *ctdb,
3748                                       uint32_t node_pnn, int32_t res,
3749                                       TDB_DATA outdata, void *callback_data)
3750 {
3751         struct ctdb_node_capabilities *caps =
3752                 talloc_get_type(callback_data,
3753                                 struct ctdb_node_capabilities);
3754
3755         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
3756                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
3757                 return;
3758         }
3759
3760         if (node_pnn >= talloc_array_length(caps)) {
3761                 DEBUG(DEBUG_ERR,
3762                       (__location__ " unexpected PNN %u\n", node_pnn));
3763                 return;
3764         }
3765
3766         caps[node_pnn].retrieved = true;
3767         caps[node_pnn].capabilities = *((uint32_t *)outdata.dptr);
3768 }
3769
3770 struct ctdb_node_capabilities *
3771 ctdb_get_capabilities(struct ctdb_context *ctdb,
3772                       TALLOC_CTX *mem_ctx,
3773                       struct timeval timeout,
3774                       struct ctdb_node_map_old *nodemap)
3775 {
3776         uint32_t *nodes;
3777         uint32_t i, res;
3778         struct ctdb_node_capabilities *ret;
3779
3780         nodes = list_of_connected_nodes(ctdb, nodemap, mem_ctx, true);
3781
3782         ret = talloc_array(mem_ctx, struct ctdb_node_capabilities,
3783                            nodemap->num);
3784         CTDB_NO_MEMORY_NULL(ctdb, ret);
3785         /* Prepopulate the expected PNNs */
3786         for (i = 0; i < talloc_array_length(ret); i++) {
3787                 ret[i].retrieved = false;
3788         }
3789
3790         res = ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
3791                                         nodes, 0, timeout,
3792                                         false, tdb_null,
3793                                         get_capabilities_callback, NULL,
3794                                         ret);
3795         if (res != 0) {
3796                 DEBUG(DEBUG_ERR,
3797                       (__location__ " Failed to read node capabilities.\n"));
3798                 TALLOC_FREE(ret);
3799         }
3800
3801         return ret;
3802 }
3803
3804 uint32_t *
3805 ctdb_get_node_capabilities(struct ctdb_node_capabilities *caps,
3806                            uint32_t pnn)
3807 {
3808         if (pnn < talloc_array_length(caps) && caps[pnn].retrieved) {
3809                 return &caps[pnn].capabilities;
3810         }
3811
3812         return NULL;
3813 }
3814
3815 bool ctdb_node_has_capabilities(struct ctdb_node_capabilities *caps,
3816                                 uint32_t pnn,
3817                                 uint32_t capabilities_required)
3818 {
3819         uint32_t *capp = ctdb_get_node_capabilities(caps, pnn);
3820         return (capp != NULL) &&
3821                 ((*capp & capabilities_required) == capabilities_required);
3822 }
3823
3824
3825 struct server_id {
3826         uint64_t pid;
3827         uint32_t task_id;
3828         uint32_t vnn;
3829         uint64_t unique_id;
3830 };
3831
3832 static struct server_id server_id_fetch(struct ctdb_context *ctdb, uint32_t reqid)
3833 {
3834         struct server_id id;
3835
3836         id.pid = getpid();
3837         id.task_id = reqid;
3838         id.vnn = ctdb_get_pnn(ctdb);
3839         id.unique_id = id.vnn;
3840         id.unique_id = (id.unique_id << 32) | reqid;
3841
3842         return id;
3843 }
3844
3845 /* This is basically a copy from Samba's server_id.*.  However, a
3846  * dependency chain stops us from using Samba's version, so use a
3847  * renamed copy until a better solution is found. */
3848 static bool ctdb_server_id_equal(struct server_id *id1, struct server_id *id2)
3849 {
3850         if (id1->pid != id2->pid) {
3851                 return false;
3852         }
3853
3854         if (id1->task_id != id2->task_id) {
3855                 return false;
3856         }
3857
3858         if (id1->vnn != id2->vnn) {
3859                 return false;
3860         }
3861
3862         if (id1->unique_id != id2->unique_id) {
3863                 return false;
3864         }
3865
3866         return true;
3867 }
3868
3869 static bool server_id_exists(struct ctdb_context *ctdb, struct server_id *id)
3870 {
3871         struct ctdb_client_id sid;
3872         int ret;
3873         uint32_t result = 0;
3874
3875         sid.type = SERVER_TYPE_SAMBA;
3876         sid.pnn = id->vnn;
3877         sid.server_id = id->pid;
3878
3879         ret = ctdb_ctrl_check_server_id(ctdb, timeval_current_ofs(3,0),
3880                                         id->vnn, &sid, &result);
3881         if (ret != 0) {
3882                 /* If control times out, assume server_id exists. */
3883                 return true;
3884         }
3885
3886         if (result) {
3887                 return true;
3888         }
3889
3890         return false;
3891 }
3892
3893
3894 enum g_lock_type {
3895         G_LOCK_READ = 0,
3896         G_LOCK_WRITE = 1,
3897 };
3898
3899 struct g_lock_rec {
3900         enum g_lock_type type;
3901         struct server_id id;
3902 };
3903
3904 struct g_lock_recs {
3905         unsigned int num;
3906         struct g_lock_rec *lock;
3907 };
3908
3909 static bool g_lock_parse(TALLOC_CTX *mem_ctx, TDB_DATA data,
3910                          struct g_lock_recs **locks)
3911 {
3912         struct g_lock_recs *recs;
3913
3914         recs = talloc_zero(mem_ctx, struct g_lock_recs);
3915         if (recs == NULL) {
3916                 return false;
3917         }
3918
3919         if (data.dsize == 0) {
3920                 goto done;
3921         }
3922
3923         if (data.dsize % sizeof(struct g_lock_rec) != 0) {
3924                 DEBUG(DEBUG_ERR, (__location__ "invalid data size %lu in g_lock record\n",
3925                                   (unsigned long)data.dsize));
3926                 talloc_free(recs);
3927                 return false;
3928         }
3929
3930         recs->num = data.dsize / sizeof(struct g_lock_rec);
3931         recs->lock = talloc_memdup(mem_ctx, data.dptr, data.dsize);
3932         if (recs->lock == NULL) {
3933                 talloc_free(recs);
3934                 return false;
3935         }
3936
3937 done:
3938         if (locks != NULL) {
3939                 *locks = recs;
3940         }
3941
3942         return true;
3943 }
3944
3945
3946 static bool g_lock_lock(TALLOC_CTX *mem_ctx,
3947                         struct ctdb_db_context *ctdb_db,
3948                         const char *keyname, uint32_t reqid)
3949 {
3950         TDB_DATA key, data;
3951         struct ctdb_record_handle *h;
3952         struct g_lock_recs *locks;
3953         struct server_id id;
3954         struct timeval t_start;
3955         int i;
3956
3957         key.dptr = (uint8_t *)discard_const(keyname);
3958         key.dsize = strlen(keyname) + 1;
3959
3960         t_start = timeval_current();
3961
3962 again:
3963         /* Keep trying for an hour. */
3964         if (timeval_elapsed(&t_start) > 3600) {
3965                 return false;
3966         }
3967
3968         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3969         if (h == NULL) {
3970                 return false;
3971         }
3972
3973         if (!g_lock_parse(h, data, &locks)) {
3974                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3975                 talloc_free(data.dptr);
3976                 talloc_free(h);
3977                 return false;
3978         }
3979
3980         talloc_free(data.dptr);
3981
3982         id = server_id_fetch(ctdb_db->ctdb, reqid);
3983
3984         i = 0;
3985         while (i < locks->num) {
3986                 if (ctdb_server_id_equal(&locks->lock[i].id, &id)) {
3987                         /* Internal error */
3988                         talloc_free(h);
3989                         return false;
3990                 }
3991
3992                 if (!server_id_exists(ctdb_db->ctdb, &locks->lock[i].id)) {
3993                         if (i < locks->num-1) {
3994                                 locks->lock[i] = locks->lock[locks->num-1];
3995                         }
3996                         locks->num--;
3997                         continue;
3998                 }
3999
4000                 /* This entry is locked. */
4001                 DEBUG(DEBUG_INFO, ("g_lock: lock already granted for "
4002                                    "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
4003                                    (unsigned long long)id.pid,
4004                                    id.task_id, id.vnn,
4005                                    (unsigned long long)id.unique_id));
4006                 talloc_free(h);
4007                 goto again;
4008         }
4009
4010         locks->lock = talloc_realloc(locks, locks->lock, struct g_lock_rec,
4011                                      locks->num+1);
4012         if (locks->lock == NULL) {
4013                 talloc_free(h);
4014                 return false;
4015         }
4016
4017         locks->lock[locks->num].type = G_LOCK_WRITE;
4018         locks->lock[locks->num].id = id;
4019         locks->num++;
4020
4021         data.dptr = (uint8_t *)locks->lock;
4022         data.dsize = locks->num * sizeof(struct g_lock_rec);
4023
4024         if (ctdb_record_store(h, data) != 0) {
4025                 DEBUG(DEBUG_ERR, ("g_lock: failed to write transaction lock for "
4026                                   "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
4027                                   (unsigned long long)id.pid,
4028                                   id.task_id, id.vnn,
4029                                   (unsigned long long)id.unique_id));
4030                 talloc_free(h);
4031                 return false;
4032         }
4033
4034         DEBUG(DEBUG_INFO, ("g_lock: lock granted for "
4035                            "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
4036                            (unsigned long long)id.pid,
4037                            id.task_id, id.vnn,
4038                            (unsigned long long)id.unique_id));
4039
4040         talloc_free(h);
4041         return true;
4042 }
4043
4044 static bool g_lock_unlock(TALLOC_CTX *mem_ctx,
4045                           struct ctdb_db_context *ctdb_db,
4046                           const char *keyname, uint32_t reqid)
4047 {
4048         TDB_DATA key, data;
4049         struct ctdb_record_handle *h;
4050         struct g_lock_recs *locks;
4051         struct server_id id;
4052         int i;
4053         bool found = false;
4054
4055         key.dptr = (uint8_t *)discard_const(keyname);
4056         key.dsize = strlen(keyname) + 1;
4057         h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
4058         if (h == NULL) {
4059                 return false;
4060         }
4061
4062         if (!g_lock_parse(h, data, &locks)) {
4063                 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
4064                 talloc_free(data.dptr);
4065                 talloc_free(h);
4066                 return false;
4067         }
4068
4069         talloc_free(data.dptr);
4070
4071         id = server_id_fetch(ctdb_db->ctdb, reqid);
4072
4073         for (i=0; i<locks->num; i++) {
4074                 if (ctdb_server_id_equal(&locks->lock[i].id, &id)) {
4075                         if (i < locks->num-1) {
4076                                 locks->lock[i] = locks->lock[locks->num-1];
4077                         }
4078                         locks->num--;
4079                         found = true;
4080                         break;
4081                 }
4082         }
4083
4084         if (!found) {
4085                 DEBUG(DEBUG_ERR, ("g_lock: lock not found\n"));
4086                 talloc_free(h);
4087                 return false;
4088         }
4089
4090         data.dptr = (uint8_t *)locks->lock;
4091         data.dsize = locks->num * sizeof(struct g_lock_rec);
4092
4093         if (ctdb_record_store(h, data) != 0) {
4094                 talloc_free(h);
4095                 return false;
4096         }
4097
4098         talloc_free(h);
4099         return true;
4100 }
4101
4102
4103 struct ctdb_transaction_handle {
4104         struct ctdb_db_context *ctdb_db;
4105         struct ctdb_db_context *g_lock_db;
4106         char *lock_name;
4107         uint32_t reqid;
4108         /*
4109          * we store reads and writes done under a transaction:
4110          * - one list stores both reads and writes (m_all)
4111          * - the other just writes (m_write)
4112          */
4113         struct ctdb_marshall_buffer *m_all;
4114         struct ctdb_marshall_buffer *m_write;
4115 };
4116
4117 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
4118 {
4119         g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid);
4120         reqid_remove(h->ctdb_db->ctdb->idr, h->reqid);
4121         return 0;
4122 }
4123
4124
4125 /**
4126  * start a transaction on a database
4127  */
4128 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
4129                                                        TALLOC_CTX *mem_ctx)
4130 {
4131         struct ctdb_transaction_handle *h;
4132         struct ctdb_client_id id;
4133
4134         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
4135         if (h == NULL) {
4136                 DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n"));
4137                 return NULL;
4138         }
4139
4140         h->ctdb_db = ctdb_db;
4141         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
4142                                        (unsigned int)ctdb_db->db_id);
4143         if (h->lock_name == NULL) {
4144                 DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n"));
4145                 talloc_free(h);
4146                 return NULL;
4147         }
4148
4149         h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0),
4150                                    "g_lock.tdb", false, 0);
4151         if (!h->g_lock_db) {
4152                 DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n"));
4153                 talloc_free(h);
4154                 return NULL;
4155         }
4156
4157         id.type = SERVER_TYPE_SAMBA;
4158         id.pnn = ctdb_get_pnn(ctdb_db->ctdb);
4159         id.server_id = getpid();
4160
4161         if (ctdb_ctrl_register_server_id(ctdb_db->ctdb, timeval_current_ofs(3,0),
4162                                          &id) != 0) {
4163                 DEBUG(DEBUG_ERR, (__location__ " unable to register server id\n"));
4164                 talloc_free(h);
4165                 return NULL;
4166         }
4167
4168         h->reqid = reqid_new(h->ctdb_db->ctdb->idr, h);
4169
4170         if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) {
4171                 DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n"));
4172                 talloc_free(h);
4173                 return NULL;
4174         }
4175
4176         talloc_set_destructor(h, ctdb_transaction_destructor);
4177         return h;
4178 }
4179
4180 /**
4181  * fetch a record inside a transaction
4182  */
4183 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
4184                            TALLOC_CTX *mem_ctx,
4185                            TDB_DATA key, TDB_DATA *data)
4186 {
4187         struct ctdb_ltdb_header header;
4188         int ret;
4189
4190         ZERO_STRUCT(header);
4191
4192         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
4193         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4194                 /* record doesn't exist yet */
4195                 *data = tdb_null;
4196                 ret = 0;
4197         }
4198
4199         if (ret != 0) {
4200                 return ret;
4201         }
4202
4203         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
4204         if (h->m_all == NULL) {
4205                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4206                 return -1;
4207         }
4208
4209         return 0;
4210 }
4211
4212 /**
4213  * stores a record inside a transaction
4214  */
4215 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
4216                            TDB_DATA key, TDB_DATA data)
4217 {
4218         TALLOC_CTX *tmp_ctx = talloc_new(h);
4219         struct ctdb_ltdb_header header;
4220         TDB_DATA olddata;
4221         int ret;
4222
4223         /* we need the header so we can update the RSN */
4224         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
4225         if (ret == -1 && header.dmaster == (uint32_t)-1) {
4226                 /* the record doesn't exist - create one with us as dmaster.
4227                    This is only safe because we are in a transaction and this
4228                    is a persistent database */
4229                 ZERO_STRUCT(header);
4230         } else if (ret != 0) {
4231                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
4232                 talloc_free(tmp_ctx);
4233                 return ret;
4234         }
4235
4236         if (data.dsize == olddata.dsize &&
4237             memcmp(data.dptr, olddata.dptr, data.dsize) == 0 &&
4238             header.rsn != 0) {
4239                 /* save writing the same data */
4240                 talloc_free(tmp_ctx);
4241                 return 0;
4242         }
4243
4244         header.dmaster = h->ctdb_db->ctdb->pnn;
4245         header.rsn++;
4246
4247         h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
4248         if (h->m_all == NULL) {
4249                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4250                 talloc_free(tmp_ctx);
4251                 return -1;
4252         }
4253
4254         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
4255         if (h->m_write == NULL) {
4256                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
4257                 talloc_free(tmp_ctx);
4258                 return -1;
4259         }
4260
4261         talloc_free(tmp_ctx);
4262         return 0;
4263 }
4264
4265 static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum)
4266 {
4267         const char *keyname = CTDB_DB_SEQNUM_KEY;
4268         TDB_DATA key, data;
4269         struct ctdb_ltdb_header header;
4270         int ret;
4271
4272         key.dptr = (uint8_t *)discard_const(keyname);
4273         key.dsize = strlen(keyname) + 1;
4274
4275         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
4276         if (ret != 0) {
4277                 *seqnum = 0;
4278                 return 0;
4279         }
4280
4281         if (data.dsize == 0) {
4282                 *seqnum = 0;
4283                 return 0;
4284         }
4285
4286         if (data.dsize != sizeof(*seqnum)) {
4287                 DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n",
4288                                   data.dsize));
4289                 talloc_free(data.dptr);
4290                 return -1;
4291         }
4292
4293         *seqnum = *(uint64_t *)data.dptr;
4294         talloc_free(data.dptr);
4295
4296         return 0;
4297 }
4298
4299
4300 static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h,
4301                                 uint64_t seqnum)
4302 {
4303         const char *keyname = CTDB_DB_SEQNUM_KEY;
4304         TDB_DATA key, data;
4305
4306         key.dptr = (uint8_t *)discard_const(keyname);
4307         key.dsize = strlen(keyname) + 1;
4308
4309         data.dptr = (uint8_t *)&seqnum;
4310         data.dsize = sizeof(seqnum);
4311
4312         return ctdb_transaction_store(h, key, data);
4313 }
4314
4315
4316 /**
4317  * commit a transaction
4318  */
4319 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
4320 {
4321         int ret;
4322         uint64_t old_seqnum, new_seqnum;
4323         int32_t status;
4324         struct timeval timeout;
4325
4326         if (h->m_write == NULL) {
4327                 /* no changes were made */
4328                 talloc_free(h);
4329                 return 0;
4330         }
4331
4332         ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum);
4333         if (ret != 0) {
4334                 DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4335                 ret = -1;
4336                 goto done;
4337         }
4338
4339         new_seqnum = old_seqnum + 1;
4340         ret = ctdb_store_db_seqnum(h, new_seqnum);
4341         if (ret != 0) {
4342                 DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n"));
4343                 ret = -1;
4344                 goto done;
4345         }
4346
4347 again:
4348         timeout = timeval_current_ofs(3,0);
4349         ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE,
4350                            h->ctdb_db->db_id,
4351                            CTDB_CONTROL_TRANS3_COMMIT, 0,
4352                            ctdb_marshall_finish(h->m_write), NULL, NULL,
4353                            &status, &timeout, NULL);
4354         if (ret != 0 || status != 0) {
4355                 /*
4356                  * TRANS3_COMMIT control will only fail if recovery has been
4357                  * triggered.  Check if the database has been updated or not.
4358                  */
4359                 ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum);
4360                 if (ret != 0) {
4361                         DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4362                         goto done;
4363                 }
4364
4365                 if (new_seqnum == old_seqnum) {
4366                         /* Database not yet updated, try again */
4367                         goto again;
4368                 }
4369
4370                 if (new_seqnum != (old_seqnum + 1)) {
4371                         DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n",
4372                                           (long long unsigned)new_seqnum,
4373                                           (long long unsigned)old_seqnum));
4374                         ret = -1;
4375                         goto done;
4376                 }
4377         }
4378
4379         ret = 0;
4380
4381 done:
4382         talloc_free(h);
4383         return ret;
4384 }
4385
4386 /**
4387  * cancel a transaction
4388  */
4389 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
4390 {
4391         talloc_free(h);
4392         return 0;
4393 }
4394
4395
4396 /*
4397   recovery daemon ping to main daemon
4398  */
4399 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4400 {
4401         int ret;
4402         int32_t res;
4403
4404         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4405                            ctdb, NULL, &res, NULL, NULL);
4406         if (ret != 0 || res != 0) {
4407                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4408                 return -1;
4409         }
4410
4411         return 0;
4412 }
4413
4414 /* When forking the main daemon and the child process needs to connect
4415  * back to the daemon as a client process, this function can be used
4416  * to change the ctdb context from daemon into client mode.  The child
4417  * process must be created using ctdb_fork() and not fork() -
4418  * ctdb_fork() does some necessary housekeeping.
4419  */
4420 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4421 {
4422         int ret;
4423         va_list ap;
4424
4425         /* Add extra information so we can identify this in the logs */
4426         va_start(ap, fmt);
4427         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4428         va_end(ap);
4429
4430         /* get a new event context */
4431         ctdb->ev = tevent_context_init(ctdb);
4432         tevent_loop_allow_nesting(ctdb->ev);
4433
4434         /* Connect to main CTDB daemon */
4435         ret = ctdb_socket_connect(ctdb);
4436         if (ret != 0) {
4437                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4438                 return -1;
4439         }
4440
4441         ctdb->can_send_controls = true;
4442
4443         return 0;
4444 }
4445
4446 /*
4447   get the status of running the monitor eventscripts: NULL means never run.
4448  */
4449 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb,
4450                               struct timeval timeout, uint32_t destnode,
4451                               TALLOC_CTX *mem_ctx,
4452                               enum ctdb_event type,
4453                               struct ctdb_script_list_old **scripts)
4454 {
4455         int ret;
4456         TDB_DATA outdata, indata;
4457         int32_t res;
4458         uint32_t uinttype = type;
4459
4460         indata.dptr = (uint8_t *)&uinttype;
4461         indata.dsize = sizeof(uinttype);
4462
4463         ret = ctdb_control(ctdb, destnode, 0, 
4464                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4465                            mem_ctx, &outdata, &res, &timeout, NULL);
4466         if (ret != 0 || res != 0) {
4467                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4468                 return -1;
4469         }
4470
4471         if (outdata.dsize == 0) {
4472                 *scripts = NULL;
4473         } else {
4474                 *scripts = (struct ctdb_script_list_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4475                 talloc_free(outdata.dptr);
4476         }
4477
4478         return 0;
4479 }
4480
4481 /*
4482   tell the main daemon how long it took to lock the reclock file
4483  */
4484 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4485 {
4486         int ret;
4487         int32_t res;
4488         TDB_DATA data;
4489
4490         data.dptr = (uint8_t *)&latency;
4491         data.dsize = sizeof(latency);
4492
4493         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4494                            ctdb, NULL, &res, NULL, NULL);
4495         if (ret != 0 || res != 0) {
4496                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4497                 return -1;
4498         }
4499
4500         return 0;
4501 }
4502
4503 /*
4504   get the name of the reclock file
4505  */
4506 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4507                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4508                          const char **name)
4509 {
4510         int ret;
4511         int32_t res;
4512         TDB_DATA data;
4513
4514         ret = ctdb_control(ctdb, destnode, 0, 
4515                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4516                            mem_ctx, &data, &res, &timeout, NULL);
4517         if (ret != 0 || res != 0) {
4518                 return -1;
4519         }
4520
4521         if (data.dsize == 0) {
4522                 *name = NULL;
4523         } else {
4524                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4525         }
4526         talloc_free(data.dptr);
4527
4528         return 0;
4529 }
4530
4531 /*
4532   set the reclock filename for a node
4533  */
4534 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4535 {
4536         int ret;
4537         TDB_DATA data;
4538         int32_t res;
4539
4540         if (reclock == NULL) {
4541                 data.dsize = 0;
4542                 data.dptr  = NULL;
4543         } else {
4544                 data.dsize = strlen(reclock) + 1;
4545                 data.dptr  = discard_const(reclock);
4546         }
4547
4548         ret = ctdb_control(ctdb, destnode, 0, 
4549                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4550                            NULL, NULL, &res, &timeout, NULL);
4551         if (ret != 0 || res != 0) {
4552                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4553                 return -1;
4554         }
4555
4556         return 0;
4557 }
4558
4559 /*
4560   stop a node
4561  */
4562 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4563 {
4564         int ret;
4565         int32_t res;
4566
4567         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4568                            ctdb, NULL, &res, &timeout, NULL);
4569         if (ret != 0 || res != 0) {
4570                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4571                 return -1;
4572         }
4573
4574         return 0;
4575 }
4576
4577 /*
4578   continue a node
4579  */
4580 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4581 {
4582         int ret;
4583
4584         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4585                            ctdb, NULL, NULL, &timeout, NULL);
4586         if (ret != 0) {
4587                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4588                 return -1;
4589         }
4590
4591         return 0;
4592 }
4593
4594 /*
4595   set the natgw state for a node
4596  */
4597 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4598 {
4599         int ret;
4600         TDB_DATA data;
4601         int32_t res;
4602
4603         data.dsize = sizeof(natgwstate);
4604         data.dptr  = (uint8_t *)&natgwstate;
4605
4606         ret = ctdb_control(ctdb, destnode, 0, 
4607                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4608                            NULL, NULL, &res, &timeout, NULL);
4609         if (ret != 0 || res != 0) {
4610                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4611                 return -1;
4612         }
4613
4614         return 0;
4615 }
4616
4617 /*
4618   set the lmaster role for a node
4619  */
4620 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4621 {
4622         int ret;
4623         TDB_DATA data;
4624         int32_t res;
4625
4626         data.dsize = sizeof(lmasterrole);
4627         data.dptr  = (uint8_t *)&lmasterrole;
4628
4629         ret = ctdb_control(ctdb, destnode, 0, 
4630                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4631                            NULL, NULL, &res, &timeout, NULL);
4632         if (ret != 0 || res != 0) {
4633                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4634                 return -1;
4635         }
4636
4637         return 0;
4638 }
4639
4640 /*
4641   set the recmaster role for a node
4642  */
4643 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4644 {
4645         int ret;
4646         TDB_DATA data;
4647         int32_t res;
4648
4649         data.dsize = sizeof(recmasterrole);
4650         data.dptr  = (uint8_t *)&recmasterrole;
4651
4652         ret = ctdb_control(ctdb, destnode, 0, 
4653                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4654                            NULL, NULL, &res, &timeout, NULL);
4655         if (ret != 0 || res != 0) {
4656                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4657                 return -1;
4658         }
4659
4660         return 0;
4661 }
4662
4663 /* enable an eventscript
4664  */
4665 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4666 {
4667         int ret;
4668         TDB_DATA data;
4669         int32_t res;
4670
4671         data.dsize = strlen(script) + 1;
4672         data.dptr  = discard_const(script);
4673
4674         ret = ctdb_control(ctdb, destnode, 0, 
4675                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4676                            NULL, NULL, &res, &timeout, NULL);
4677         if (ret != 0 || res != 0) {
4678                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4679                 return -1;
4680         }
4681
4682         return 0;
4683 }
4684
4685 /* disable an eventscript
4686  */
4687 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4688 {
4689         int ret;
4690         TDB_DATA data;
4691         int32_t res;
4692
4693         data.dsize = strlen(script) + 1;
4694         data.dptr  = discard_const(script);
4695
4696         ret = ctdb_control(ctdb, destnode, 0, 
4697                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4698                            NULL, NULL, &res, &timeout, NULL);
4699         if (ret != 0 || res != 0) {
4700                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4701                 return -1;
4702         }
4703
4704         return 0;
4705 }
4706
4707
4708 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout,
4709                       uint32_t destnode, struct ctdb_ban_state *bantime)
4710 {
4711         int ret;
4712         TDB_DATA data;
4713         int32_t res;
4714
4715         data.dsize = sizeof(*bantime);
4716         data.dptr  = (uint8_t *)bantime;
4717
4718         ret = ctdb_control(ctdb, destnode, 0, 
4719                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4720                            NULL, NULL, &res, &timeout, NULL);
4721         if (ret != 0 || res != 0) {
4722                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4723                 return -1;
4724         }
4725
4726         return 0;
4727 }
4728
4729
4730 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout,
4731                       uint32_t destnode, TALLOC_CTX *mem_ctx,
4732                       struct ctdb_ban_state **bantime)
4733 {
4734         int ret;
4735         TDB_DATA outdata;
4736         int32_t res;
4737         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4738
4739         ret = ctdb_control(ctdb, destnode, 0, 
4740                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4741                            tmp_ctx, &outdata, &res, &timeout, NULL);
4742         if (ret != 0 || res != 0) {
4743                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4744                 talloc_free(tmp_ctx);
4745                 return -1;
4746         }
4747
4748         *bantime = (struct ctdb_ban_state *)talloc_steal(mem_ctx, outdata.dptr);
4749         talloc_free(tmp_ctx);
4750
4751         return 0;
4752 }
4753
4754
4755 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4756 {
4757         int ret;
4758         int32_t res;
4759         TDB_DATA data;
4760         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4761
4762         data.dptr = (uint8_t*)db_prio;
4763         data.dsize = sizeof(*db_prio);
4764
4765         ret = ctdb_control(ctdb, destnode, 0, 
4766                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4767                            tmp_ctx, NULL, &res, &timeout, NULL);
4768         if (ret != 0 || res != 0) {
4769                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4770                 talloc_free(tmp_ctx);
4771                 return -1;
4772         }
4773
4774         talloc_free(tmp_ctx);
4775
4776         return 0;
4777 }
4778
4779 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4780 {
4781         int ret;
4782         int32_t res;
4783         TDB_DATA data;
4784         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4785
4786         data.dptr = (uint8_t*)&db_id;
4787         data.dsize = sizeof(db_id);
4788
4789         ret = ctdb_control(ctdb, destnode, 0, 
4790                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4791                            tmp_ctx, NULL, &res, &timeout, NULL);
4792         if (ret != 0 || res < 0) {
4793                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4794                 talloc_free(tmp_ctx);
4795                 return -1;
4796         }
4797
4798         if (priority) {
4799                 *priority = res;
4800         }
4801
4802         talloc_free(tmp_ctx);
4803
4804         return 0;
4805 }
4806
4807 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb,
4808                              struct timeval timeout, uint32_t destnode,
4809                              TALLOC_CTX *mem_ctx,
4810                              struct ctdb_statistics_list_old **stats)
4811 {
4812         int ret;
4813         TDB_DATA outdata;
4814         int32_t res;
4815
4816         ret = ctdb_control(ctdb, destnode, 0, 
4817                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4818                            mem_ctx, &outdata, &res, &timeout, NULL);
4819         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4820                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4821                 return -1;
4822         }
4823
4824         *stats = (struct ctdb_statistics_list_old *)talloc_memdup(mem_ctx,
4825                                                                   outdata.dptr,
4826                                                                   outdata.dsize);
4827         talloc_free(outdata.dptr);
4828
4829         return 0;
4830 }
4831
4832 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4833 {
4834         if (h == NULL) {
4835                 return NULL;
4836         }
4837
4838         return &h->header;
4839 }
4840
4841
4842 struct ctdb_client_control_state *
4843 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4844 {
4845         struct ctdb_client_control_state *handle;
4846         struct ctdb_marshall_buffer *m;
4847         struct ctdb_rec_data_old *rec;
4848         TDB_DATA outdata;
4849
4850         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4851         if (m == NULL) {
4852                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4853                 return NULL;
4854         }
4855
4856         m->db_id = ctdb_db->db_id;
4857
4858         rec = ctdb_marshall_record(m, 0, key, header, data);
4859         if (rec == NULL) {
4860                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4861                 talloc_free(m);
4862                 return NULL;
4863         }
4864         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4865         if (m == NULL) {
4866                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4867                 talloc_free(m);
4868                 return NULL;
4869         }
4870         m->count++;
4871         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4872
4873
4874         outdata.dptr = (uint8_t *)m;
4875         outdata.dsize = talloc_get_size(m);
4876
4877         handle = ctdb_control_send(ctdb, destnode, 0, 
4878                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4879                            mem_ctx, &timeout, NULL);
4880         talloc_free(m);
4881         return handle;
4882 }
4883
4884 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4885 {
4886         int ret;
4887         int32_t res;
4888
4889         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4890         if ( (ret != 0) || (res != 0) ){
4891                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4892                 return -1;
4893         }
4894
4895         return 0;
4896 }
4897
4898 int
4899 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4900 {
4901         struct ctdb_client_control_state *state;
4902
4903         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4904         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4905 }
4906
4907
4908
4909
4910
4911
4912 /*
4913   set a database to be readonly
4914  */
4915 struct ctdb_client_control_state *
4916 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4917 {
4918         TDB_DATA data;
4919
4920         data.dptr = (uint8_t *)&dbid;
4921         data.dsize = sizeof(dbid);
4922
4923         return ctdb_control_send(ctdb, destnode, 0, 
4924                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4925                            ctdb, NULL, NULL);
4926 }
4927
4928 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4929 {
4930         int ret;
4931         int32_t res;
4932
4933         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4934         if (ret != 0 || res != 0) {
4935           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4936                 return -1;
4937         }
4938
4939         return 0;
4940 }
4941
4942 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4943 {
4944         struct ctdb_client_control_state *state;
4945
4946         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4947         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4948 }
4949
4950 /*
4951   set a database to be sticky
4952  */
4953 struct ctdb_client_control_state *
4954 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4955 {
4956         TDB_DATA data;
4957
4958         data.dptr = (uint8_t *)&dbid;
4959         data.dsize = sizeof(dbid);
4960
4961         return ctdb_control_send(ctdb, destnode, 0, 
4962                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4963                            ctdb, NULL, NULL);
4964 }
4965
4966 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4967 {
4968         int ret;
4969         int32_t res;
4970
4971         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4972         if (ret != 0 || res != 0) {
4973                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4974                 return -1;
4975         }
4976
4977         return 0;
4978 }
4979
4980 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4981 {
4982         struct ctdb_client_control_state *state;
4983
4984         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4985         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4986 }