f0fbea40bc2293ff3031467318e710de9a8472d5
[vlendec/samba-autobuild/.git] / ctdb / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/tdb_wrap/tdb_wrap.h"
23 #include "tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/locale.h"
28 #include <stdlib.h>
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31 #include "common/reqid.h"
32 #include "common/system.h"
33
34 /*
35   allocate a packet for use in client<->daemon communication
36  */
37 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
38                                             TALLOC_CTX *mem_ctx, 
39                                             enum ctdb_operation operation, 
40                                             size_t length, size_t slength,
41                                             const char *type)
42 {
43         int size;
44         struct ctdb_req_header *hdr;
45
46         length = MAX(length, slength);
47         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
48
49         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
50         if (hdr == NULL) {
51                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
52                          operation, (unsigned)length));
53                 return NULL;
54         }
55         talloc_set_name_const(hdr, type);
56         hdr->length       = length;
57         hdr->operation    = operation;
58         hdr->ctdb_magic   = CTDB_MAGIC;
59         hdr->ctdb_version = CTDB_PROTOCOL;
60         hdr->srcnode      = ctdb->pnn;
61         if (ctdb->vnn_map) {
62                 hdr->generation = ctdb->vnn_map->generation;
63         }
64
65         return hdr;
66 }
67
68 /*
69   local version of ctdb_call
70 */
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73                     TDB_DATA *data, bool updatetdb)
74 {
75         struct ctdb_call_info *c;
76         struct ctdb_registered_call *fn;
77         struct ctdb_context *ctdb = ctdb_db->ctdb;
78         
79         c = talloc(ctdb, struct ctdb_call_info);
80         CTDB_NO_MEMORY(ctdb, c);
81
82         c->key = call->key;
83         c->call_data = &call->call_data;
84         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85         c->record_data.dsize = data->dsize;
86         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
87         c->new_data = NULL;
88         c->reply_data = NULL;
89         c->status = 0;
90         c->header = header;
91
92         for (fn=ctdb_db->calls;fn;fn=fn->next) {
93                 if (fn->id == call->call_id) break;
94         }
95         if (fn == NULL) {
96                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
97                 talloc_free(c);
98                 return -1;
99         }
100
101         if (fn->fn(c) != 0) {
102                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
103                 talloc_free(c);
104                 return -1;
105         }
106
107         /* we need to force the record to be written out if this was a remote access */
108         if (c->new_data == NULL) {
109                 c->new_data = &c->record_data;
110         }
111
112         if (c->new_data && updatetdb) {
113                 /* XXX check that we always have the lock here? */
114                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
115                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
116                         talloc_free(c);
117                         return -1;
118                 }
119         }
120
121         if (c->reply_data) {
122                 call->reply_data = *c->reply_data;
123
124                 talloc_steal(call, call->reply_data.dptr);
125                 talloc_set_name_const(call->reply_data.dptr, __location__);
126         } else {
127                 call->reply_data.dptr = NULL;
128                 call->reply_data.dsize = 0;
129         }
130         call->status = c->status;
131
132         talloc_free(c);
133
134         return 0;
135 }
136
137
138 /*
139   queue a packet for sending from client to daemon
140 */
141 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
142 {
143         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
144 }
145
146
147 /*
148   called when a CTDB_REPLY_CALL packet comes in in the client
149
150   This packet comes in response to a CTDB_REQ_CALL request packet. It
151   contains any reply data from the call
152 */
153 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
154 {
155         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
156         struct ctdb_client_call_state *state;
157
158         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_client_call_state);
159         if (state == NULL) {
160                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
161                 return;
162         }
163
164         if (hdr->reqid != state->reqid) {
165                 /* we found a record  but it was the wrong one */
166                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
167                 return;
168         }
169
170         state->call->reply_data.dptr = c->data;
171         state->call->reply_data.dsize = c->datalen;
172         state->call->status = c->status;
173
174         talloc_steal(state, c);
175
176         state->state = CTDB_CALL_DONE;
177
178         if (state->async.fn) {
179                 state->async.fn(state);
180         }
181 }
182
183 void ctdb_request_message(struct ctdb_context *ctdb,
184                           struct ctdb_req_header *hdr)
185 {
186         struct ctdb_req_message *c = (struct ctdb_req_message *)hdr;
187         TDB_DATA data;
188
189         data.dsize = c->datalen;
190         data.dptr = talloc_memdup(c, &c->data[0], c->datalen);
191         if (data.dptr == NULL) {
192                 DEBUG(DEBUG_ERR, (__location__ " Memory allocation failure\n"));
193                 return;
194         }
195
196         srvid_dispatch(ctdb->srv, c->srvid, CTDB_SRVID_ALL, data);
197 }
198
199 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
200
201 /*
202   this is called in the client, when data comes in from the daemon
203  */
204 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
205 {
206         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
207         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
208         TALLOC_CTX *tmp_ctx;
209
210         /* place the packet as a child of a tmp_ctx. We then use
211            talloc_free() below to free it. If any of the calls want
212            to keep it, then they will steal it somewhere else, and the
213            talloc_free() will be a no-op */
214         tmp_ctx = talloc_new(ctdb);
215         talloc_steal(tmp_ctx, hdr);
216
217         if (cnt == 0) {
218                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
219                 exit(1);
220         }
221
222         if (cnt < sizeof(*hdr)) {
223                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
224                 goto done;
225         }
226         if (cnt != hdr->length) {
227                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
228                                (unsigned)hdr->length, (unsigned)cnt);
229                 goto done;
230         }
231
232         if (hdr->ctdb_magic != CTDB_MAGIC) {
233                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
234                 goto done;
235         }
236
237         if (hdr->ctdb_version != CTDB_PROTOCOL) {
238                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
239                 goto done;
240         }
241
242         switch (hdr->operation) {
243         case CTDB_REPLY_CALL:
244                 ctdb_client_reply_call(ctdb, hdr);
245                 break;
246
247         case CTDB_REQ_MESSAGE:
248                 ctdb_request_message(ctdb, hdr);
249                 break;
250
251         case CTDB_REPLY_CONTROL:
252                 ctdb_client_reply_control(ctdb, hdr);
253                 break;
254
255         default:
256                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
257         }
258
259 done:
260         talloc_free(tmp_ctx);
261 }
262
263 /*
264   connect to a unix domain socket
265 */
266 int ctdb_socket_connect(struct ctdb_context *ctdb)
267 {
268         struct sockaddr_un addr;
269
270         memset(&addr, 0, sizeof(addr));
271         addr.sun_family = AF_UNIX;
272         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
273
274         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
275         if (ctdb->daemon.sd == -1) {
276                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
277                 return -1;
278         }
279
280         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
281                 close(ctdb->daemon.sd);
282                 ctdb->daemon.sd = -1;
283                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
284                 return -1;
285         }
286
287         set_nonblocking(ctdb->daemon.sd);
288         set_close_on_exec(ctdb->daemon.sd);
289         
290         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
291                                               CTDB_DS_ALIGNMENT, 
292                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
293         return 0;
294 }
295
296
297 struct ctdb_record_handle {
298         struct ctdb_db_context *ctdb_db;
299         TDB_DATA key;
300         TDB_DATA *data;
301         struct ctdb_ltdb_header header;
302 };
303
304
305 /*
306   make a recv call to the local ctdb daemon - called from client context
307
308   This is called when the program wants to wait for a ctdb_call to complete and get the 
309   results. This call will block unless the call has already completed.
310 */
311 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
312 {
313         if (state == NULL) {
314                 return -1;
315         }
316
317         while (state->state < CTDB_CALL_DONE) {
318                 event_loop_once(state->ctdb_db->ctdb->ev);
319         }
320         if (state->state != CTDB_CALL_DONE) {
321                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
322                 talloc_free(state);
323                 return -1;
324         }
325
326         if (state->call->reply_data.dsize) {
327                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
328                                                       state->call->reply_data.dptr,
329                                                       state->call->reply_data.dsize);
330                 call->reply_data.dsize = state->call->reply_data.dsize;
331         } else {
332                 call->reply_data.dptr = NULL;
333                 call->reply_data.dsize = 0;
334         }
335         call->status = state->call->status;
336         talloc_free(state);
337
338         return call->status;
339 }
340
341
342
343
344 /*
345   destroy a ctdb_call in client
346 */
347 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
348 {
349         reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
350         return 0;
351 }
352
353 /*
354   construct an event driven local ctdb_call
355
356   this is used so that locally processed ctdb_call requests are processed
357   in an event driven manner
358 */
359 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
360                                                                   struct ctdb_call *call,
361                                                                   struct ctdb_ltdb_header *header,
362                                                                   TDB_DATA *data)
363 {
364         struct ctdb_client_call_state *state;
365         struct ctdb_context *ctdb = ctdb_db->ctdb;
366         int ret;
367
368         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
369         CTDB_NO_MEMORY_NULL(ctdb, state);
370         state->call = talloc_zero(state, struct ctdb_call);
371         CTDB_NO_MEMORY_NULL(ctdb, state->call);
372
373         talloc_steal(state, data->dptr);
374
375         state->state   = CTDB_CALL_DONE;
376         *(state->call) = *call;
377         state->ctdb_db = ctdb_db;
378
379         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
380         if (ret != 0) {
381                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
382         }
383
384         return state;
385 }
386
387 /*
388   make a ctdb call to the local daemon - async send. Called from client context.
389
390   This constructs a ctdb_call request and queues it for processing. 
391   This call never blocks.
392 */
393 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
394                                               struct ctdb_call *call)
395 {
396         struct ctdb_client_call_state *state;
397         struct ctdb_context *ctdb = ctdb_db->ctdb;
398         struct ctdb_ltdb_header header;
399         TDB_DATA data;
400         int ret;
401         size_t len;
402         struct ctdb_req_call *c;
403
404         /* if the domain socket is not yet open, open it */
405         if (ctdb->daemon.sd==-1) {
406                 ctdb_socket_connect(ctdb);
407         }
408
409         ret = ctdb_ltdb_lock(ctdb_db, call->key);
410         if (ret != 0) {
411                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
412                 return NULL;
413         }
414
415         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
416
417         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
418                 ret = -1;
419         }
420
421         if (ret == 0 && header.dmaster == ctdb->pnn) {
422                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
423                 talloc_free(data.dptr);
424                 ctdb_ltdb_unlock(ctdb_db, call->key);
425                 return state;
426         }
427
428         ctdb_ltdb_unlock(ctdb_db, call->key);
429         talloc_free(data.dptr);
430
431         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
432         if (state == NULL) {
433                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
434                 return NULL;
435         }
436         state->call = talloc_zero(state, struct ctdb_call);
437         if (state->call == NULL) {
438                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
439                 return NULL;
440         }
441
442         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
443         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
444         if (c == NULL) {
445                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
446                 return NULL;
447         }
448
449         state->reqid     = reqid_new(ctdb->idr, state);
450         state->ctdb_db = ctdb_db;
451         talloc_set_destructor(state, ctdb_client_call_destructor);
452
453         c->hdr.reqid     = state->reqid;
454         c->flags         = call->flags;
455         c->db_id         = ctdb_db->db_id;
456         c->callid        = call->call_id;
457         c->hopcount      = 0;
458         c->keylen        = call->key.dsize;
459         c->calldatalen   = call->call_data.dsize;
460         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
461         memcpy(&c->data[call->key.dsize], 
462                call->call_data.dptr, call->call_data.dsize);
463         *(state->call)              = *call;
464         state->call->call_data.dptr = &c->data[call->key.dsize];
465         state->call->key.dptr       = &c->data[0];
466
467         state->state  = CTDB_CALL_WAIT;
468
469
470         ctdb_client_queue_pkt(ctdb, &c->hdr);
471
472         return state;
473 }
474
475
476 /*
477   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
478 */
479 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
480 {
481         struct ctdb_client_call_state *state;
482
483         state = ctdb_call_send(ctdb_db, call);
484         return ctdb_call_recv(state, call);
485 }
486
487
488 /*
489   tell the daemon what messaging srvid we will use, and register the message
490   handler function in the client
491 */
492 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
493                                     srvid_handler_fn handler,
494                                     void *private_data)
495 {
496         int res;
497         int32_t status;
498
499         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid,
500                            CTDB_CONTROL_REGISTER_SRVID, 0,
501                            tdb_null, NULL, NULL, &status, NULL, NULL);
502         if (res != 0 || status != 0) {
503                 DEBUG(DEBUG_ERR,
504                       ("Failed to register srvid %llu\n",
505                        (unsigned long long)srvid));
506                 return -1;
507         }
508
509         /* also need to register the handler with our own ctdb structure */
510         return srvid_register(ctdb->srv, ctdb, srvid, handler, private_data);
511 }
512
513 /*
514   tell the daemon we no longer want a srvid
515 */
516 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb,
517                                        uint64_t srvid, void *private_data)
518 {
519         int res;
520         int32_t status;
521
522         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid,
523                            CTDB_CONTROL_DEREGISTER_SRVID, 0,
524                            tdb_null, NULL, NULL, &status, NULL, NULL);
525         if (res != 0 || status != 0) {
526                 DEBUG(DEBUG_ERR,
527                       ("Failed to deregister srvid %llu\n",
528                        (unsigned long long)srvid));
529                 return -1;
530         }
531
532         /* also need to register the handler with our own ctdb structure */
533         srvid_deregister(ctdb->srv, srvid, private_data);
534         return 0;
535 }
536
537 /*
538  * check server ids
539  */
540 int ctdb_client_check_message_handlers(struct ctdb_context *ctdb, uint64_t *ids, uint32_t num,
541                                        uint8_t *result)
542 {
543         TDB_DATA indata, outdata;
544         int res;
545         int32_t status;
546         int i;
547
548         indata.dptr = (uint8_t *)ids;
549         indata.dsize = num * sizeof(*ids);
550
551         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CHECK_SRVIDS, 0,
552                            indata, ctdb, &outdata, &status, NULL, NULL);
553         if (res != 0 || status != 0) {
554                 DEBUG(DEBUG_ERR, (__location__ " failed to check srvids\n"));
555                 return -1;
556         }
557
558         if (outdata.dsize != num*sizeof(uint8_t)) {
559                 DEBUG(DEBUG_ERR, (__location__ " expected %lu bytes, received %zi bytes\n",
560                                   (long unsigned int)num*sizeof(uint8_t),
561                                   outdata.dsize));
562                 talloc_free(outdata.dptr);
563                 return -1;
564         }
565
566         for (i=0; i<num; i++) {
567                 result[i] = outdata.dptr[i];
568         }
569
570         talloc_free(outdata.dptr);
571         return 0;
572 }
573
574 /*
575   send a message - from client context
576  */
577 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
578                       uint64_t srvid, TDB_DATA data)
579 {
580         struct ctdb_req_message *r;
581         int len, res;
582
583         len = offsetof(struct ctdb_req_message, data) + data.dsize;
584         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
585                                len, struct ctdb_req_message);
586         CTDB_NO_MEMORY(ctdb, r);
587
588         r->hdr.destnode  = pnn;
589         r->srvid         = srvid;
590         r->datalen       = data.dsize;
591         memcpy(&r->data[0], data.dptr, data.dsize);
592         
593         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
594         talloc_free(r);
595         return res;
596 }
597
598
599 /*
600   cancel a ctdb_fetch_lock operation, releasing the lock
601  */
602 static int fetch_lock_destructor(struct ctdb_record_handle *h)
603 {
604         ctdb_ltdb_unlock(h->ctdb_db, h->key);
605         return 0;
606 }
607
608 /*
609   force the migration of a record to this node
610  */
611 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
612 {
613         struct ctdb_call call;
614         ZERO_STRUCT(call);
615         call.call_id = CTDB_NULL_FUNC;
616         call.key = key;
617         call.flags = CTDB_IMMEDIATE_MIGRATION;
618         return ctdb_call(ctdb_db, &call);
619 }
620
621 /*
622   try to fetch a readonly copy of a record
623  */
624 static int
625 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
626 {
627         int ret;
628
629         struct ctdb_call call;
630         ZERO_STRUCT(call);
631
632         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
633         call.call_data.dptr = NULL;
634         call.call_data.dsize = 0;
635         call.key = key;
636         call.flags = CTDB_WANT_READONLY;
637         ret = ctdb_call(ctdb_db, &call);
638
639         if (ret != 0) {
640                 return -1;
641         }
642         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
643                 return -1;
644         }
645
646         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
647         if (*hdr == NULL) {
648                 talloc_free(call.reply_data.dptr);
649                 return -1;
650         }
651
652         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
653         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
654         if (data->dptr == NULL) {
655                 talloc_free(call.reply_data.dptr);
656                 talloc_free(hdr);
657                 return -1;
658         }
659
660         return 0;
661 }
662
663 /*
664   get a lock on a record, and return the records data. Blocks until it gets the lock
665  */
666 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
667                                            TDB_DATA key, TDB_DATA *data)
668 {
669         int ret;
670         struct ctdb_record_handle *h;
671
672         /*
673           procedure is as follows:
674
675           1) get the chain lock. 
676           2) check if we are dmaster
677           3) if we are the dmaster then return handle 
678           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
679              reply from ctdbd
680           5) when we get the reply, goto (1)
681          */
682
683         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
684         if (h == NULL) {
685                 return NULL;
686         }
687
688         h->ctdb_db = ctdb_db;
689         h->key     = key;
690         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
691         if (h->key.dptr == NULL) {
692                 talloc_free(h);
693                 return NULL;
694         }
695         h->data    = data;
696
697         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
698                  (const char *)key.dptr));
699
700 again:
701         /* step 1 - get the chain lock */
702         ret = ctdb_ltdb_lock(ctdb_db, key);
703         if (ret != 0) {
704                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
705                 talloc_free(h);
706                 return NULL;
707         }
708
709         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
710
711         talloc_set_destructor(h, fetch_lock_destructor);
712
713         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
714
715         /* when torturing, ensure we test the remote path */
716         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
717             random() % 5 == 0) {
718                 h->header.dmaster = (uint32_t)-1;
719         }
720
721
722         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
723
724         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
725                 ctdb_ltdb_unlock(ctdb_db, key);
726                 ret = ctdb_client_force_migration(ctdb_db, key);
727                 if (ret != 0) {
728                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
729                         talloc_free(h);
730                         return NULL;
731                 }
732                 goto again;
733         }
734
735         /* if this is a request for read/write and we have delegations
736            we have to revoke all delegations first
737         */
738         if ((h->header.dmaster == ctdb_db->ctdb->pnn) &&
739             (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
740                 ctdb_ltdb_unlock(ctdb_db, key);
741                 ret = ctdb_client_force_migration(ctdb_db, key);
742                 if (ret != 0) {
743                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
744                         talloc_free(h);
745                         return NULL;
746                 }
747                 goto again;
748         }
749
750         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
751         return h;
752 }
753
754 /*
755   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
756  */
757 struct ctdb_record_handle *
758 ctdb_fetch_readonly_lock(
759         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
760         TDB_DATA key, TDB_DATA *data,
761         int read_only)
762 {
763         int ret;
764         struct ctdb_record_handle *h;
765         struct ctdb_ltdb_header *roheader = NULL;
766
767         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
768         if (h == NULL) {
769                 return NULL;
770         }
771
772         h->ctdb_db = ctdb_db;
773         h->key     = key;
774         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
775         if (h->key.dptr == NULL) {
776                 talloc_free(h);
777                 return NULL;
778         }
779         h->data    = data;
780
781         data->dptr = NULL;
782         data->dsize = 0;
783
784
785 again:
786         talloc_free(roheader);
787         roheader = NULL;
788
789         talloc_free(data->dptr);
790         data->dptr = NULL;
791         data->dsize = 0;
792
793         /* Lock the record/chain */
794         ret = ctdb_ltdb_lock(ctdb_db, key);
795         if (ret != 0) {
796                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
797                 talloc_free(h);
798                 return NULL;
799         }
800
801         talloc_set_destructor(h, fetch_lock_destructor);
802
803         /* Check if record exists yet in the TDB */
804         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
805         if (ret != 0) {
806                 ctdb_ltdb_unlock(ctdb_db, key);
807                 ret = ctdb_client_force_migration(ctdb_db, key);
808                 if (ret != 0) {
809                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
810                         talloc_free(h);
811                         return NULL;
812                 }
813                 goto again;
814         }
815
816         /* if this is a request for read/write and we have delegations
817            we have to revoke all delegations first
818         */
819         if ((read_only == 0) 
820         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
821         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
822                 ctdb_ltdb_unlock(ctdb_db, key);
823                 ret = ctdb_client_force_migration(ctdb_db, key);
824                 if (ret != 0) {
825                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
826                         talloc_free(h);
827                         return NULL;
828                 }
829                 goto again;
830         }
831
832         /* if we are dmaster, just return the handle */
833         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
834                 return h;
835         }
836
837         if (read_only != 0) {
838                 TDB_DATA rodata = {NULL, 0};
839
840                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
841                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
842                         return h;
843                 }
844
845                 ctdb_ltdb_unlock(ctdb_db, key);
846                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
847                 if (ret != 0) {
848                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
849                         ret = ctdb_client_force_migration(ctdb_db, key);
850                         if (ret != 0) {
851                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
852                                 talloc_free(h);
853                                 return NULL;
854                         }
855
856                         goto again;
857                 }
858
859                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
860                         ret = ctdb_client_force_migration(ctdb_db, key);
861                         if (ret != 0) {
862                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
863                                 talloc_free(h);
864                                 return NULL;
865                         }
866
867                         goto again;
868                 }
869
870                 ret = ctdb_ltdb_lock(ctdb_db, key);
871                 if (ret != 0) {
872                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
873                         talloc_free(h);
874                         return NULL;
875                 }
876
877                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
878                 if (ret != 0) {
879                         ctdb_ltdb_unlock(ctdb_db, key);
880
881                         ret = ctdb_client_force_migration(ctdb_db, key);
882                         if (ret != 0) {
883                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
884                                 talloc_free(h);
885                                 return NULL;
886                         }
887
888                         goto again;
889                 }
890
891                 return h;
892         }
893
894         /* we are not dmaster and this was not a request for a readonly lock
895          * so unlock the record, migrate it and try again
896          */
897         ctdb_ltdb_unlock(ctdb_db, key);
898         ret = ctdb_client_force_migration(ctdb_db, key);
899         if (ret != 0) {
900                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
901                 talloc_free(h);
902                 return NULL;
903         }
904         goto again;
905 }
906
907 /*
908   store some data to the record that was locked with ctdb_fetch_lock()
909 */
910 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
911 {
912         if (h->ctdb_db->persistent) {
913                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
914                 return -1;
915         }
916
917         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
918 }
919
920 /*
921   non-locking fetch of a record
922  */
923 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
924                TDB_DATA key, TDB_DATA *data)
925 {
926         struct ctdb_call call;
927         int ret;
928
929         call.call_id = CTDB_FETCH_FUNC;
930         call.call_data.dptr = NULL;
931         call.call_data.dsize = 0;
932         call.key = key;
933
934         ret = ctdb_call(ctdb_db, &call);
935
936         if (ret == 0) {
937                 *data = call.reply_data;
938                 talloc_steal(mem_ctx, data->dptr);
939         }
940
941         return ret;
942 }
943
944
945
946 /*
947    called when a control completes or timesout to invoke the callback
948    function the user provided
949 */
950 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
951         struct timeval t, void *private_data)
952 {
953         struct ctdb_client_control_state *state;
954         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
955         int ret;
956
957         state = talloc_get_type(private_data, struct ctdb_client_control_state);
958         talloc_steal(tmp_ctx, state);
959
960         ret = ctdb_control_recv(state->ctdb, state, state,
961                         NULL, 
962                         NULL, 
963                         NULL);
964         if (ret != 0) {
965                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
966         }
967
968         talloc_free(tmp_ctx);
969 }
970
971 /*
972   called when a CTDB_REPLY_CONTROL packet comes in in the client
973
974   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
975   contains any reply data from the control
976 */
977 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
978                                       struct ctdb_req_header *hdr)
979 {
980         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
981         struct ctdb_client_control_state *state;
982
983         state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_client_control_state);
984         if (state == NULL) {
985                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
986                 return;
987         }
988
989         if (hdr->reqid != state->reqid) {
990                 /* we found a record  but it was the wrong one */
991                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
992                 return;
993         }
994
995         state->outdata.dptr = c->data;
996         state->outdata.dsize = c->datalen;
997         state->status = c->status;
998         if (c->errorlen) {
999                 state->errormsg = talloc_strndup(state, 
1000                                                  (char *)&c->data[c->datalen], 
1001                                                  c->errorlen);
1002         }
1003
1004         /* state->outdata now uses resources from c so we dont want c
1005            to just dissappear from under us while state is still alive
1006         */
1007         talloc_steal(state, c);
1008
1009         state->state = CTDB_CONTROL_DONE;
1010
1011         /* if we had a callback registered for this control, pull the response
1012            and call the callback.
1013         */
1014         if (state->async.fn) {
1015                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1016         }
1017 }
1018
1019
1020 /*
1021   destroy a ctdb_control in client
1022 */
1023 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
1024 {
1025         reqid_remove(state->ctdb->idr, state->reqid);
1026         return 0;
1027 }
1028
1029
1030 /* time out handler for ctdb_control */
1031 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
1032         struct timeval t, void *private_data)
1033 {
1034         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
1035
1036         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
1037                          "dstnode:%u\n", state->reqid, state->c->opcode,
1038                          state->c->hdr.destnode));
1039
1040         state->state = CTDB_CONTROL_TIMEOUT;
1041
1042         /* if we had a callback registered for this control, pull the response
1043            and call the callback.
1044         */
1045         if (state->async.fn) {
1046                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
1047         }
1048 }
1049
1050 /* async version of send control request */
1051 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
1052                 uint32_t destnode, uint64_t srvid, 
1053                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
1054                 TALLOC_CTX *mem_ctx,
1055                 struct timeval *timeout,
1056                 char **errormsg)
1057 {
1058         struct ctdb_client_control_state *state;
1059         size_t len;
1060         struct ctdb_req_control *c;
1061         int ret;
1062
1063         if (errormsg) {
1064                 *errormsg = NULL;
1065         }
1066
1067         /* if the domain socket is not yet open, open it */
1068         if (ctdb->daemon.sd==-1) {
1069                 ctdb_socket_connect(ctdb);
1070         }
1071
1072         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1073         CTDB_NO_MEMORY_NULL(ctdb, state);
1074
1075         state->ctdb       = ctdb;
1076         state->reqid      = reqid_new(ctdb->idr, state);
1077         state->state      = CTDB_CONTROL_WAIT;
1078         state->errormsg   = NULL;
1079
1080         talloc_set_destructor(state, ctdb_client_control_destructor);
1081
1082         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1083         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1084                                len, struct ctdb_req_control);
1085         state->c            = c;        
1086         CTDB_NO_MEMORY_NULL(ctdb, c);
1087         c->hdr.reqid        = state->reqid;
1088         c->hdr.destnode     = destnode;
1089         c->opcode           = opcode;
1090         c->client_id        = 0;
1091         c->flags            = flags;
1092         c->srvid            = srvid;
1093         c->datalen          = data.dsize;
1094         if (data.dsize) {
1095                 memcpy(&c->data[0], data.dptr, data.dsize);
1096         }
1097
1098         /* timeout */
1099         if (timeout && !timeval_is_zero(timeout)) {
1100                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1101         }
1102
1103         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1104         if (ret != 0) {
1105                 talloc_free(state);
1106                 return NULL;
1107         }
1108
1109         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1110                 talloc_free(state);
1111                 return NULL;
1112         }
1113
1114         return state;
1115 }
1116
1117
1118 /* async version of receive control reply */
1119 int ctdb_control_recv(struct ctdb_context *ctdb, 
1120                 struct ctdb_client_control_state *state, 
1121                 TALLOC_CTX *mem_ctx,
1122                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1123 {
1124         TALLOC_CTX *tmp_ctx;
1125
1126         if (status != NULL) {
1127                 *status = -1;
1128         }
1129         if (errormsg != NULL) {
1130                 *errormsg = NULL;
1131         }
1132
1133         if (state == NULL) {
1134                 return -1;
1135         }
1136
1137         /* prevent double free of state */
1138         tmp_ctx = talloc_new(ctdb);
1139         talloc_steal(tmp_ctx, state);
1140
1141         /* loop one event at a time until we either timeout or the control
1142            completes.
1143         */
1144         while (state->state == CTDB_CONTROL_WAIT) {
1145                 event_loop_once(ctdb->ev);
1146         }
1147
1148         if (state->state != CTDB_CONTROL_DONE) {
1149                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1150                 if (state->async.fn) {
1151                         state->async.fn(state);
1152                 }
1153                 talloc_free(tmp_ctx);
1154                 return -1;
1155         }
1156
1157         if (state->errormsg) {
1158                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1159                 if (errormsg) {
1160                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1161                 }
1162                 if (state->async.fn) {
1163                         state->async.fn(state);
1164                 }
1165                 talloc_free(tmp_ctx);
1166                 return (status == 0 ? -1 : state->status);
1167         }
1168
1169         if (outdata) {
1170                 *outdata = state->outdata;
1171                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1172         }
1173
1174         if (status) {
1175                 *status = state->status;
1176         }
1177
1178         if (state->async.fn) {
1179                 state->async.fn(state);
1180         }
1181
1182         talloc_free(tmp_ctx);
1183         return 0;
1184 }
1185
1186
1187
1188 /*
1189   send a ctdb control message
1190   timeout specifies how long we should wait for a reply.
1191   if timeout is NULL we wait indefinitely
1192  */
1193 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1194                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1195                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1196                  struct timeval *timeout,
1197                  char **errormsg)
1198 {
1199         struct ctdb_client_control_state *state;
1200
1201         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1202                         flags, data, mem_ctx,
1203                         timeout, errormsg);
1204
1205         /* FIXME: Error conditions in ctdb_control_send return NULL without
1206          * setting errormsg.  So, there is no way to distinguish between sucess
1207          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1208         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1209                 if (status != NULL) {
1210                         *status = 0;
1211                 }
1212                 return 0;
1213         }
1214
1215         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1216                         errormsg);
1217 }
1218
1219
1220
1221
1222 /*
1223   a process exists call. Returns 0 if process exists, -1 otherwise
1224  */
1225 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1226 {
1227         int ret;
1228         TDB_DATA data;
1229         int32_t status;
1230
1231         data.dptr = (uint8_t*)&pid;
1232         data.dsize = sizeof(pid);
1233
1234         ret = ctdb_control(ctdb, destnode, 0, 
1235                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1236                            NULL, NULL, &status, NULL, NULL);
1237         if (ret != 0) {
1238                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1239                 return -1;
1240         }
1241
1242         return status;
1243 }
1244
1245 /*
1246   get remote statistics
1247  */
1248 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1249 {
1250         int ret;
1251         TDB_DATA data;
1252         int32_t res;
1253
1254         ret = ctdb_control(ctdb, destnode, 0, 
1255                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1256                            ctdb, &data, &res, NULL, NULL);
1257         if (ret != 0 || res != 0) {
1258                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1259                 return -1;
1260         }
1261
1262         if (data.dsize != sizeof(struct ctdb_statistics)) {
1263                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1264                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1265                       return -1;
1266         }
1267
1268         *status = *(struct ctdb_statistics *)data.dptr;
1269         talloc_free(data.dptr);
1270                         
1271         return 0;
1272 }
1273
1274 /*
1275  * get db statistics
1276  */
1277 int ctdb_ctrl_dbstatistics(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1278                            TALLOC_CTX *mem_ctx, struct ctdb_db_statistics **dbstat)
1279 {
1280         int ret;
1281         TDB_DATA indata, outdata;
1282         int32_t res;
1283         struct ctdb_db_statistics *wire, *s;
1284         char *ptr;
1285         int i;
1286
1287         indata.dptr = (uint8_t *)&dbid;
1288         indata.dsize = sizeof(dbid);
1289
1290         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_STATISTICS,
1291                            0, indata, ctdb, &outdata, &res, NULL, NULL);
1292         if (ret != 0 || res != 0) {
1293                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for dbstatistics failed\n"));
1294                 return -1;
1295         }
1296
1297         if (outdata.dsize < offsetof(struct ctdb_db_statistics, hot_keys_wire)) {
1298                 DEBUG(DEBUG_ERR,(__location__ " Wrong dbstatistics size %zi - expected >= %lu\n",
1299                                  outdata.dsize,
1300                                  (long unsigned int)sizeof(struct ctdb_statistics)));
1301                 return -1;
1302         }
1303
1304         s = talloc_zero(mem_ctx, struct ctdb_db_statistics);
1305         if (s == NULL) {
1306                 talloc_free(outdata.dptr);
1307                 CTDB_NO_MEMORY(ctdb, s);
1308         }
1309
1310         wire = (struct ctdb_db_statistics *)outdata.dptr;
1311         memcpy(s, wire, offsetof(struct ctdb_db_statistics, hot_keys_wire));
1312         ptr = &wire->hot_keys_wire[0];
1313         for (i=0; i<wire->num_hot_keys; i++) {
1314                 s->hot_keys[i].key.dptr = talloc_size(mem_ctx, s->hot_keys[i].key.dsize);
1315                 if (s->hot_keys[i].key.dptr == NULL) {
1316                         talloc_free(outdata.dptr);
1317                         CTDB_NO_MEMORY(ctdb, s->hot_keys[i].key.dptr);
1318                 }
1319
1320                 memcpy(s->hot_keys[i].key.dptr, ptr, s->hot_keys[i].key.dsize);
1321                 ptr += wire->hot_keys[i].key.dsize;
1322         }
1323
1324         talloc_free(outdata.dptr);
1325         *dbstat = s;
1326         return 0;
1327 }
1328
1329 /*
1330   shutdown a remote ctdb node
1331  */
1332 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1333 {
1334         struct ctdb_client_control_state *state;
1335
1336         state = ctdb_control_send(ctdb, destnode, 0, 
1337                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1338                            NULL, &timeout, NULL);
1339         if (state == NULL) {
1340                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1341                 return -1;
1342         }
1343
1344         return 0;
1345 }
1346
1347 /*
1348   get vnn map from a remote node
1349  */
1350 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1351 {
1352         int ret;
1353         TDB_DATA outdata;
1354         int32_t res;
1355         struct ctdb_vnn_map_wire *map;
1356
1357         ret = ctdb_control(ctdb, destnode, 0, 
1358                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1359                            mem_ctx, &outdata, &res, &timeout, NULL);
1360         if (ret != 0 || res != 0) {
1361                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1362                 return -1;
1363         }
1364         
1365         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1366         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1367             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1368                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1369                 return -1;
1370         }
1371
1372         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1373         CTDB_NO_MEMORY(ctdb, *vnnmap);
1374         (*vnnmap)->generation = map->generation;
1375         (*vnnmap)->size       = map->size;
1376         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1377
1378         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1379         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1380         talloc_free(outdata.dptr);
1381                     
1382         return 0;
1383 }
1384
1385
1386 /*
1387   get the recovery mode of a remote node
1388  */
1389 struct ctdb_client_control_state *
1390 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1391 {
1392         return ctdb_control_send(ctdb, destnode, 0, 
1393                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1394                            mem_ctx, &timeout, NULL);
1395 }
1396
1397 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1398 {
1399         int ret;
1400         int32_t res;
1401
1402         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1403         if (ret != 0) {
1404                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1405                 return -1;
1406         }
1407
1408         if (recmode) {
1409                 *recmode = (uint32_t)res;
1410         }
1411
1412         return 0;
1413 }
1414
1415 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1416 {
1417         struct ctdb_client_control_state *state;
1418
1419         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1420         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1421 }
1422
1423
1424
1425
1426 /*
1427   set the recovery mode of a remote node
1428  */
1429 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1430 {
1431         int ret;
1432         TDB_DATA data;
1433         int32_t res;
1434
1435         data.dsize = sizeof(uint32_t);
1436         data.dptr = (unsigned char *)&recmode;
1437
1438         ret = ctdb_control(ctdb, destnode, 0, 
1439                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1440                            NULL, NULL, &res, &timeout, NULL);
1441         if (ret != 0 || res != 0) {
1442                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1443                 return -1;
1444         }
1445
1446         return 0;
1447 }
1448
1449
1450
1451 /*
1452   get the recovery master of a remote node
1453  */
1454 struct ctdb_client_control_state *
1455 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1456                         struct timeval timeout, uint32_t destnode)
1457 {
1458         return ctdb_control_send(ctdb, destnode, 0, 
1459                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1460                            mem_ctx, &timeout, NULL);
1461 }
1462
1463 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1464 {
1465         int ret;
1466         int32_t res;
1467
1468         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1469         if (ret != 0) {
1470                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1471                 return -1;
1472         }
1473
1474         if (recmaster) {
1475                 *recmaster = (uint32_t)res;
1476         }
1477
1478         return 0;
1479 }
1480
1481 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1482 {
1483         struct ctdb_client_control_state *state;
1484
1485         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1486         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1487 }
1488
1489
1490 /*
1491   set the recovery master of a remote node
1492  */
1493 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1494 {
1495         int ret;
1496         TDB_DATA data;
1497         int32_t res;
1498
1499         ZERO_STRUCT(data);
1500         data.dsize = sizeof(uint32_t);
1501         data.dptr = (unsigned char *)&recmaster;
1502
1503         ret = ctdb_control(ctdb, destnode, 0, 
1504                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1505                            NULL, NULL, &res, &timeout, NULL);
1506         if (ret != 0 || res != 0) {
1507                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1508                 return -1;
1509         }
1510
1511         return 0;
1512 }
1513
1514
1515 /*
1516   get a list of databases off a remote node
1517  */
1518 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1519                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1520 {
1521         int ret;
1522         TDB_DATA outdata;
1523         int32_t res;
1524
1525         ret = ctdb_control(ctdb, destnode, 0, 
1526                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1527                            mem_ctx, &outdata, &res, &timeout, NULL);
1528         if (ret != 0 || res != 0) {
1529                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1530                 return -1;
1531         }
1532
1533         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1534         talloc_free(outdata.dptr);
1535                     
1536         return 0;
1537 }
1538
1539 /*
1540   get a list of nodes (vnn and flags ) from a remote node
1541  */
1542 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1543                 struct timeval timeout, uint32_t destnode, 
1544                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1545 {
1546         int ret;
1547         TDB_DATA outdata;
1548         int32_t res;
1549
1550         ret = ctdb_control(ctdb, destnode, 0,
1551                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null,
1552                            mem_ctx, &outdata, &res, &timeout, NULL);
1553         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1554                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1555                 return -1;
1556         }
1557
1558         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1559         talloc_free(outdata.dptr);
1560         return 0;
1561 }
1562
1563 /*
1564   load nodes file on a remote node and return as a node map
1565  */
1566 int ctdb_ctrl_getnodesfile(struct ctdb_context *ctdb,
1567                            struct timeval timeout, uint32_t destnode,
1568                            TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1569 {
1570         int ret;
1571         TDB_DATA outdata;
1572         int32_t res;
1573
1574         ret = ctdb_control(ctdb, destnode, 0,
1575                            CTDB_CONTROL_GET_NODES_FILE, 0, tdb_null,
1576                            mem_ctx, &outdata, &res, &timeout, NULL);
1577         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1578                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1579                 return -1;
1580         }
1581
1582         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1583         talloc_free(outdata.dptr);
1584
1585         return 0;
1586 }
1587
1588 /*
1589   drop the transport, reload the nodes file and restart the transport
1590  */
1591 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1592                     struct timeval timeout, uint32_t destnode)
1593 {
1594         int ret;
1595         int32_t res;
1596
1597         ret = ctdb_control(ctdb, destnode, 0, 
1598                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1599                            NULL, NULL, &res, &timeout, NULL);
1600         if (ret != 0 || res != 0) {
1601                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1602                 return -1;
1603         }
1604
1605         return 0;
1606 }
1607
1608
1609 /*
1610   set vnn map on a node
1611  */
1612 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1613                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1614 {
1615         int ret;
1616         TDB_DATA data;
1617         int32_t res;
1618         struct ctdb_vnn_map_wire *map;
1619         size_t len;
1620
1621         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1622         map = talloc_size(mem_ctx, len);
1623         CTDB_NO_MEMORY(ctdb, map);
1624
1625         map->generation = vnnmap->generation;
1626         map->size = vnnmap->size;
1627         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1628         
1629         data.dsize = len;
1630         data.dptr  = (uint8_t *)map;
1631
1632         ret = ctdb_control(ctdb, destnode, 0, 
1633                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1634                            NULL, NULL, &res, &timeout, NULL);
1635         if (ret != 0 || res != 0) {
1636                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1637                 return -1;
1638         }
1639
1640         talloc_free(map);
1641
1642         return 0;
1643 }
1644
1645
1646 /*
1647   async send for pull database
1648  */
1649 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1650         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1651         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1652 {
1653         TDB_DATA indata;
1654         struct ctdb_control_pulldb *pull;
1655         struct ctdb_client_control_state *state;
1656
1657         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1658         CTDB_NO_MEMORY_NULL(ctdb, pull);
1659
1660         pull->db_id   = dbid;
1661         pull->lmaster = lmaster;
1662
1663         indata.dsize = sizeof(struct ctdb_control_pulldb);
1664         indata.dptr  = (unsigned char *)pull;
1665
1666         state = ctdb_control_send(ctdb, destnode, 0, 
1667                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1668                                   mem_ctx, &timeout, NULL);
1669         talloc_free(pull);
1670
1671         return state;
1672 }
1673
1674 /*
1675   async recv for pull database
1676  */
1677 int ctdb_ctrl_pulldb_recv(
1678         struct ctdb_context *ctdb, 
1679         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1680         TDB_DATA *outdata)
1681 {
1682         int ret;
1683         int32_t res;
1684
1685         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1686         if ( (ret != 0) || (res != 0) ){
1687                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1688                 return -1;
1689         }
1690
1691         return 0;
1692 }
1693
1694 /*
1695   pull all keys and records for a specific database on a node
1696  */
1697 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1698                 uint32_t dbid, uint32_t lmaster, 
1699                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1700                 TDB_DATA *outdata)
1701 {
1702         struct ctdb_client_control_state *state;
1703
1704         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1705                                       timeout);
1706         
1707         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1708 }
1709
1710
1711 /*
1712   change dmaster for all keys in the database to the new value
1713  */
1714 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1715                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1716 {
1717         int ret;
1718         TDB_DATA indata;
1719         int32_t res;
1720
1721         indata.dsize = 2*sizeof(uint32_t);
1722         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1723
1724         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1725         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1726
1727         ret = ctdb_control(ctdb, destnode, 0, 
1728                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1729                            NULL, NULL, &res, &timeout, NULL);
1730         if (ret != 0 || res != 0) {
1731                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1732                 return -1;
1733         }
1734
1735         return 0;
1736 }
1737
1738 /*
1739   ping a node, return number of clients connected
1740  */
1741 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1742 {
1743         int ret;
1744         int32_t res;
1745
1746         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1747                            tdb_null, NULL, NULL, &res, NULL, NULL);
1748         if (ret != 0) {
1749                 return -1;
1750         }
1751         return res;
1752 }
1753
1754 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1755                            struct timeval timeout, 
1756                            uint32_t destnode,
1757                            uint32_t *runstate)
1758 {
1759         TDB_DATA outdata;
1760         int32_t res;
1761         int ret;
1762
1763         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1764                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1765         if (ret != 0 || res != 0) {
1766                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1767                 return ret != 0 ? ret : res;
1768         }
1769
1770         if (outdata.dsize != sizeof(uint32_t)) {
1771                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1772                 talloc_free(outdata.dptr);
1773                 return -1;
1774         }
1775
1776         if (runstate != NULL) {
1777                 *runstate = *(uint32_t *)outdata.dptr;
1778         }
1779         talloc_free(outdata.dptr);
1780
1781         return 0;
1782 }
1783
1784 /*
1785   find the real path to a ltdb 
1786  */
1787 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1788                    const char **path)
1789 {
1790         int ret;
1791         int32_t res;
1792         TDB_DATA data;
1793
1794         data.dptr = (uint8_t *)&dbid;
1795         data.dsize = sizeof(dbid);
1796
1797         ret = ctdb_control(ctdb, destnode, 0, 
1798                            CTDB_CONTROL_GETDBPATH, 0, data, 
1799                            mem_ctx, &data, &res, &timeout, NULL);
1800         if (ret != 0 || res != 0) {
1801                 return -1;
1802         }
1803
1804         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1805         if ((*path) == NULL) {
1806                 return -1;
1807         }
1808
1809         talloc_free(data.dptr);
1810
1811         return 0;
1812 }
1813
1814 /*
1815   find the name of a db 
1816  */
1817 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1818                    const char **name)
1819 {
1820         int ret;
1821         int32_t res;
1822         TDB_DATA data;
1823
1824         data.dptr = (uint8_t *)&dbid;
1825         data.dsize = sizeof(dbid);
1826
1827         ret = ctdb_control(ctdb, destnode, 0, 
1828                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1829                            mem_ctx, &data, &res, &timeout, NULL);
1830         if (ret != 0 || res != 0) {
1831                 return -1;
1832         }
1833
1834         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1835         if ((*name) == NULL) {
1836                 return -1;
1837         }
1838
1839         talloc_free(data.dptr);
1840
1841         return 0;
1842 }
1843
1844 /*
1845   get the health status of a db
1846  */
1847 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1848                           struct timeval timeout,
1849                           uint32_t destnode,
1850                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1851                           const char **reason)
1852 {
1853         int ret;
1854         int32_t res;
1855         TDB_DATA data;
1856
1857         data.dptr = (uint8_t *)&dbid;
1858         data.dsize = sizeof(dbid);
1859
1860         ret = ctdb_control(ctdb, destnode, 0,
1861                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1862                            mem_ctx, &data, &res, &timeout, NULL);
1863         if (ret != 0 || res != 0) {
1864                 return -1;
1865         }
1866
1867         if (data.dsize == 0) {
1868                 (*reason) = NULL;
1869                 return 0;
1870         }
1871
1872         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1873         if ((*reason) == NULL) {
1874                 return -1;
1875         }
1876
1877         talloc_free(data.dptr);
1878
1879         return 0;
1880 }
1881
1882 /*
1883  * get db sequence number
1884  */
1885 int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout,
1886                           uint32_t destnode, uint32_t dbid, uint64_t *seqnum)
1887 {
1888         int ret;
1889         int32_t res;
1890         TDB_DATA data, outdata;
1891
1892         data.dptr = (uint8_t *)&dbid;
1893         data.dsize = sizeof(uint64_t);  /* This is just wrong */
1894
1895         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_SEQNUM,
1896                            0, data, ctdb, &outdata, &res, &timeout, NULL);
1897         if (ret != 0 || res != 0) {
1898                 DEBUG(DEBUG_ERR,("ctdb_control for getdbesqnum failed\n"));
1899                 return -1;
1900         }
1901
1902         if (outdata.dsize != sizeof(uint64_t)) {
1903                 DEBUG(DEBUG_ERR,("Invalid return data in get_dbseqnum\n"));
1904                 talloc_free(outdata.dptr);
1905                 return -1;
1906         }
1907
1908         if (seqnum != NULL) {
1909                 *seqnum = *(uint64_t *)outdata.dptr;
1910         }
1911         talloc_free(outdata.dptr);
1912
1913         return 0;
1914 }
1915
1916 /*
1917   create a database
1918  */
1919 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1920                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1921 {
1922         int ret;
1923         int32_t res;
1924         TDB_DATA data;
1925         uint64_t tdb_flags = 0;
1926
1927         data.dptr = discard_const(name);
1928         data.dsize = strlen(name)+1;
1929
1930         /* Make sure that volatile databases use jenkins hash */
1931         if (!persistent) {
1932                 tdb_flags = TDB_INCOMPATIBLE_HASH;
1933         }
1934
1935 #ifdef TDB_MUTEX_LOCKING
1936         if (!persistent && ctdb->tunable.mutex_enabled == 1) {
1937                 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
1938         }
1939 #endif
1940
1941         ret = ctdb_control(ctdb, destnode, tdb_flags,
1942                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1943                            0, data, 
1944                            mem_ctx, &data, &res, &timeout, NULL);
1945
1946         if (ret != 0 || res != 0) {
1947                 return -1;
1948         }
1949
1950         return 0;
1951 }
1952
1953 /*
1954   get debug level on a node
1955  */
1956 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1957 {
1958         int ret;
1959         int32_t res;
1960         TDB_DATA data;
1961
1962         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1963                            ctdb, &data, &res, NULL, NULL);
1964         if (ret != 0 || res != 0) {
1965                 return -1;
1966         }
1967         if (data.dsize != sizeof(int32_t)) {
1968                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1969                          (unsigned)data.dsize));
1970                 return -1;
1971         }
1972         *level = *(int32_t *)data.dptr;
1973         talloc_free(data.dptr);
1974         return 0;
1975 }
1976
1977 /*
1978   set debug level on a node
1979  */
1980 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1981 {
1982         int ret;
1983         int32_t res;
1984         TDB_DATA data;
1985
1986         data.dptr = (uint8_t *)&level;
1987         data.dsize = sizeof(level);
1988
1989         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1990                            NULL, NULL, &res, NULL, NULL);
1991         if (ret != 0 || res != 0) {
1992                 return -1;
1993         }
1994         return 0;
1995 }
1996
1997
1998 /*
1999   get a list of connected nodes
2000  */
2001 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
2002                                 struct timeval timeout,
2003                                 TALLOC_CTX *mem_ctx,
2004                                 uint32_t *num_nodes)
2005 {
2006         struct ctdb_node_map *map=NULL;
2007         int ret, i;
2008         uint32_t *nodes;
2009
2010         *num_nodes = 0;
2011
2012         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
2013         if (ret != 0) {
2014                 return NULL;
2015         }
2016
2017         nodes = talloc_array(mem_ctx, uint32_t, map->num);
2018         if (nodes == NULL) {
2019                 return NULL;
2020         }
2021
2022         for (i=0;i<map->num;i++) {
2023                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2024                         nodes[*num_nodes] = map->nodes[i].pnn;
2025                         (*num_nodes)++;
2026                 }
2027         }
2028
2029         return nodes;
2030 }
2031
2032
2033 /*
2034   reset remote status
2035  */
2036 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
2037 {
2038         int ret;
2039         int32_t res;
2040
2041         ret = ctdb_control(ctdb, destnode, 0, 
2042                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
2043                            NULL, NULL, &res, NULL, NULL);
2044         if (ret != 0 || res != 0) {
2045                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
2046                 return -1;
2047         }
2048         return 0;
2049 }
2050
2051 /*
2052   attach to a specific database - client call
2053 */
2054 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
2055                                     struct timeval timeout,
2056                                     const char *name,
2057                                     bool persistent,
2058                                     uint32_t tdb_flags)
2059 {
2060         struct ctdb_db_context *ctdb_db;
2061         TDB_DATA data;
2062         int ret;
2063         int32_t res;
2064 #ifdef TDB_MUTEX_LOCKING
2065         uint32_t mutex_enabled = 0;
2066 #endif
2067
2068         ctdb_db = ctdb_db_handle(ctdb, name);
2069         if (ctdb_db) {
2070                 return ctdb_db;
2071         }
2072
2073         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
2074         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
2075
2076         ctdb_db->ctdb = ctdb;
2077         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
2078         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
2079
2080         data.dptr = discard_const(name);
2081         data.dsize = strlen(name)+1;
2082
2083         /* CTDB has switched to using jenkins hash for volatile databases.
2084          * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH,
2085          * always set it.
2086          */
2087         if (!persistent) {
2088                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
2089         }
2090
2091 #ifdef TDB_MUTEX_LOCKING
2092         if (!persistent) {
2093                 ret = ctdb_ctrl_get_tunable(ctdb, timeval_current_ofs(3,0),
2094                                             CTDB_CURRENT_NODE,
2095                                             "TDBMutexEnabled",
2096                                             &mutex_enabled);
2097                 if (ret != 0) {
2098                         DEBUG(DEBUG_WARNING, ("Assuming no mutex support.\n"));
2099                 }
2100
2101                 if (mutex_enabled == 1) {
2102                         tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
2103                 }
2104         }
2105 #endif
2106
2107         /* tell ctdb daemon to attach */
2108         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
2109                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
2110                            0, data, ctdb_db, &data, &res, NULL, NULL);
2111         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
2112                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
2113                 talloc_free(ctdb_db);
2114                 return NULL;
2115         }
2116         
2117         ctdb_db->db_id = *(uint32_t *)data.dptr;
2118         talloc_free(data.dptr);
2119
2120         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
2121         if (ret != 0) {
2122                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
2123                 talloc_free(ctdb_db);
2124                 return NULL;
2125         }
2126
2127         if (persistent) {
2128                 tdb_flags = TDB_DEFAULT;
2129         } else {
2130                 tdb_flags = TDB_NOSYNC;
2131 #ifdef TDB_MUTEX_LOCKING
2132                 if (mutex_enabled) {
2133                         tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
2134                 }
2135 #endif
2136         }
2137         if (ctdb->valgrinding) {
2138                 tdb_flags |= TDB_NOMMAP;
2139         }
2140         tdb_flags |= TDB_DISALLOW_NESTING;
2141
2142         ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path, 0, tdb_flags,
2143                                       O_RDWR, 0);
2144         if (ctdb_db->ltdb == NULL) {
2145                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
2146                 talloc_free(ctdb_db);
2147                 return NULL;
2148         }
2149
2150         ctdb_db->persistent = persistent;
2151
2152         DLIST_ADD(ctdb->db_list, ctdb_db);
2153
2154         /* add well known functions */
2155         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2156         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2157         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2158
2159         return ctdb_db;
2160 }
2161
2162 /*
2163  * detach from a specific database - client call
2164  */
2165 int ctdb_detach(struct ctdb_context *ctdb, uint32_t db_id)
2166 {
2167         int ret;
2168         int32_t status;
2169         TDB_DATA data;
2170
2171         data.dsize = sizeof(db_id);
2172         data.dptr = (uint8_t *)&db_id;
2173
2174         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_DETACH,
2175                            0, data, NULL, NULL, &status, NULL, NULL);
2176         if (ret != 0 || status != 0) {
2177                 return -1;
2178         }
2179         return 0;
2180 }
2181
2182 /*
2183   setup a call for a database
2184  */
2185 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2186 {
2187         struct ctdb_registered_call *call;
2188
2189         /* register locally */
2190         call = talloc(ctdb_db, struct ctdb_registered_call);
2191         call->fn = fn;
2192         call->id = id;
2193
2194         DLIST_ADD(ctdb_db->calls, call);
2195         return 0;
2196 }
2197
2198
2199 struct traverse_state {
2200         bool done;
2201         uint32_t count;
2202         ctdb_traverse_func fn;
2203         void *private_data;
2204         bool listemptyrecords;
2205 };
2206
2207 /*
2208   called on each key during a ctdb_traverse
2209  */
2210 static void traverse_handler(uint64_t srvid, TDB_DATA data, void *p)
2211 {
2212         struct traverse_state *state = (struct traverse_state *)p;
2213         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2214         TDB_DATA key;
2215
2216         if (data.dsize < sizeof(uint32_t) || d->length != data.dsize) {
2217                 DEBUG(DEBUG_ERR, ("Bad data size %u in traverse_handler\n",
2218                                   (unsigned)data.dsize));
2219                 state->done = true;
2220                 return;
2221         }
2222
2223         key.dsize = d->keylen;
2224         key.dptr  = &d->data[0];
2225         data.dsize = d->datalen;
2226         data.dptr = &d->data[d->keylen];
2227
2228         if (key.dsize == 0 && data.dsize == 0) {
2229                 /* end of traverse */
2230                 state->done = true;
2231                 return;
2232         }
2233
2234         if (!state->listemptyrecords &&
2235             data.dsize == sizeof(struct ctdb_ltdb_header))
2236         {
2237                 /* empty records are deleted records in ctdb */
2238                 return;
2239         }
2240
2241         if (state->fn(key, data, state->private_data) != 0) {
2242                 state->done = true;
2243         }
2244
2245         state->count++;
2246 }
2247
2248 /**
2249  * start a cluster wide traverse, calling the supplied fn on each record
2250  * return the number of records traversed, or -1 on error
2251  *
2252  * Extendet variant with a flag to signal whether empty records should
2253  * be listed.
2254  */
2255 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2256                              ctdb_traverse_func fn,
2257                              bool withemptyrecords,
2258                              void *private_data)
2259 {
2260         TDB_DATA data;
2261         struct ctdb_traverse_start_ext t;
2262         int32_t status;
2263         int ret;
2264         uint64_t srvid = (getpid() | 0xFLL<<60);
2265         struct traverse_state state;
2266
2267         state.done = false;
2268         state.count = 0;
2269         state.private_data = private_data;
2270         state.fn = fn;
2271         state.listemptyrecords = withemptyrecords;
2272
2273         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2274         if (ret != 0) {
2275                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2276                 return -1;
2277         }
2278
2279         t.db_id = ctdb_db->db_id;
2280         t.srvid = srvid;
2281         t.reqid = 0;
2282         t.withemptyrecords = withemptyrecords;
2283
2284         data.dptr = (uint8_t *)&t;
2285         data.dsize = sizeof(t);
2286
2287         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2288                            data, NULL, NULL, &status, NULL, NULL);
2289         if (ret != 0 || status != 0) {
2290                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2291                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2292                 return -1;
2293         }
2294
2295         while (!state.done) {
2296                 event_loop_once(ctdb_db->ctdb->ev);
2297         }
2298
2299         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2300         if (ret != 0) {
2301                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2302                 return -1;
2303         }
2304
2305         return state.count;
2306 }
2307
2308 /**
2309  * start a cluster wide traverse, calling the supplied fn on each record
2310  * return the number of records traversed, or -1 on error
2311  *
2312  * Standard version which does not list the empty records:
2313  * These are considered deleted.
2314  */
2315 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2316 {
2317         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2318 }
2319
2320 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2321 /*
2322   called on each key during a catdb
2323  */
2324 int ctdb_dumpdb_record(TDB_DATA key, TDB_DATA data, void *p)
2325 {
2326         int i;
2327         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2328         FILE *f = c->f;
2329         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2330
2331         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2332         for (i=0;i<key.dsize;i++) {
2333                 if (ISASCII(key.dptr[i])) {
2334                         fprintf(f, "%c", key.dptr[i]);
2335                 } else {
2336                         fprintf(f, "\\%02X", key.dptr[i]);
2337                 }
2338         }
2339         fprintf(f, "\"\n");
2340
2341         fprintf(f, "dmaster: %u\n", h->dmaster);
2342         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2343
2344         if (c->printlmaster && c->ctdb->vnn_map != NULL) {
2345                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(c->ctdb, &key));
2346         }
2347
2348         if (c->printhash) {
2349                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2350         }
2351
2352         if (c->printrecordflags) {
2353                 fprintf(f, "flags: 0x%08x", h->flags);
2354                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2355                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2356                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2357                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2358                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2359                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2360                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2361                 fprintf(f, "\n");
2362         }
2363
2364         if (c->printdatasize) {
2365                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2366         } else {
2367                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2368                 for (i=sizeof(*h);i<data.dsize;i++) {
2369                         if (ISASCII(data.dptr[i])) {
2370                                 fprintf(f, "%c", data.dptr[i]);
2371                         } else {
2372                                 fprintf(f, "\\%02X", data.dptr[i]);
2373                         }
2374                 }
2375                 fprintf(f, "\"\n");
2376         }
2377
2378         fprintf(f, "\n");
2379
2380         return 0;
2381 }
2382
2383 /*
2384   convenience function to list all keys to stdout
2385  */
2386 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2387                  struct ctdb_dump_db_context *ctx)
2388 {
2389         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2390                                  ctx->printemptyrecords, ctx);
2391 }
2392
2393 /*
2394   get the pid of a ctdb daemon
2395  */
2396 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2397 {
2398         int ret;
2399         int32_t res;
2400
2401         ret = ctdb_control(ctdb, destnode, 0, 
2402                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2403                            NULL, NULL, &res, &timeout, NULL);
2404         if (ret != 0) {
2405                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2406                 return -1;
2407         }
2408
2409         *pid = res;
2410
2411         return 0;
2412 }
2413
2414
2415 /*
2416   async freeze send control
2417  */
2418 struct ctdb_client_control_state *
2419 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2420 {
2421         return ctdb_control_send(ctdb, destnode, priority, 
2422                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2423                            mem_ctx, &timeout, NULL);
2424 }
2425
2426 /* 
2427    async freeze recv control
2428 */
2429 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2430 {
2431         int ret;
2432         int32_t res;
2433
2434         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2435         if ( (ret != 0) || (res != 0) ){
2436                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2437                 return -1;
2438         }
2439
2440         return 0;
2441 }
2442
2443 /*
2444   freeze databases of a certain priority
2445  */
2446 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2447 {
2448         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2449         struct ctdb_client_control_state *state;
2450         int ret;
2451
2452         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2453         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2454         talloc_free(tmp_ctx);
2455
2456         return ret;
2457 }
2458
2459 /* Freeze all databases */
2460 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2461 {
2462         int i;
2463
2464         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2465                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2466                         return -1;
2467                 }
2468         }
2469         return 0;
2470 }
2471
2472 /*
2473   thaw databases of a certain priority
2474  */
2475 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2476 {
2477         int ret;
2478         int32_t res;
2479
2480         ret = ctdb_control(ctdb, destnode, priority, 
2481                            CTDB_CONTROL_THAW, 0, tdb_null, 
2482                            NULL, NULL, &res, &timeout, NULL);
2483         if (ret != 0 || res != 0) {
2484                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2485                 return -1;
2486         }
2487
2488         return 0;
2489 }
2490
2491 /* thaw all databases */
2492 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2493 {
2494         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2495 }
2496
2497 /*
2498   get pnn of a node, or -1
2499  */
2500 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2501 {
2502         int ret;
2503         int32_t res;
2504
2505         ret = ctdb_control(ctdb, destnode, 0, 
2506                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2507                            NULL, NULL, &res, &timeout, NULL);
2508         if (ret != 0) {
2509                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2510                 return -1;
2511         }
2512
2513         return res;
2514 }
2515
2516 /*
2517   get the monitoring mode of a remote node
2518  */
2519 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2520 {
2521         int ret;
2522         int32_t res;
2523
2524         ret = ctdb_control(ctdb, destnode, 0, 
2525                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2526                            NULL, NULL, &res, &timeout, NULL);
2527         if (ret != 0) {
2528                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2529                 return -1;
2530         }
2531
2532         *monmode = res;
2533
2534         return 0;
2535 }
2536
2537
2538 /*
2539  set the monitoring mode of a remote node to active
2540  */
2541 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2542 {
2543         int ret;
2544         
2545
2546         ret = ctdb_control(ctdb, destnode, 0, 
2547                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2548                            NULL, NULL,NULL, &timeout, NULL);
2549         if (ret != 0) {
2550                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2551                 return -1;
2552         }
2553
2554         
2555
2556         return 0;
2557 }
2558
2559 /*
2560   set the monitoring mode of a remote node to disable
2561  */
2562 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2563 {
2564         int ret;
2565         
2566
2567         ret = ctdb_control(ctdb, destnode, 0, 
2568                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2569                            NULL, NULL, NULL, &timeout, NULL);
2570         if (ret != 0) {
2571                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2572                 return -1;
2573         }
2574
2575         
2576
2577         return 0;
2578 }
2579
2580
2581
2582 /*
2583   sent to a node to make it take over an ip address
2584 */
2585 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout,
2586                           uint32_t destnode, struct ctdb_public_ip *ip)
2587 {
2588         TDB_DATA data;
2589         int ret;
2590         int32_t res;
2591
2592         data.dsize = sizeof(*ip);
2593         data.dptr  = (uint8_t *)ip;
2594
2595         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0,
2596                            data, NULL, NULL, &res, &timeout, NULL);
2597         if (ret != 0 || res != 0) {
2598                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2599                 return -1;
2600         }
2601
2602         return 0;
2603 }
2604
2605
2606 /*
2607   sent to a node to make it release an ip address
2608 */
2609 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout,
2610                          uint32_t destnode, struct ctdb_public_ip *ip)
2611 {
2612         TDB_DATA data;
2613         int ret;
2614         int32_t res;
2615
2616         data.dsize = sizeof(*ip);
2617         data.dptr  = (uint8_t *)ip;
2618
2619         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0,
2620                            data, NULL, NULL, &res, &timeout, NULL);
2621         if (ret != 0 || res != 0) {
2622                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2623                 return -1;
2624         }
2625
2626         return 0;
2627 }
2628
2629
2630 /*
2631   get a tunable
2632  */
2633 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2634                           struct timeval timeout, 
2635                           uint32_t destnode,
2636                           const char *name, uint32_t *value)
2637 {
2638         struct ctdb_control_get_tunable *t;
2639         TDB_DATA data, outdata;
2640         int32_t res;
2641         int ret;
2642
2643         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2644         data.dptr  = talloc_size(ctdb, data.dsize);
2645         CTDB_NO_MEMORY(ctdb, data.dptr);
2646
2647         t = (struct ctdb_control_get_tunable *)data.dptr;
2648         t->length = strlen(name)+1;
2649         memcpy(t->name, name, t->length);
2650
2651         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2652                            &outdata, &res, &timeout, NULL);
2653         talloc_free(data.dptr);
2654         if (ret != 0 || res != 0) {
2655                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2656                 return ret != 0 ? ret : res;
2657         }
2658
2659         if (outdata.dsize != sizeof(uint32_t)) {
2660                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2661                 talloc_free(outdata.dptr);
2662                 return -1;
2663         }
2664         
2665         *value = *(uint32_t *)outdata.dptr;
2666         talloc_free(outdata.dptr);
2667
2668         return 0;
2669 }
2670
2671 /*
2672   set a tunable
2673  */
2674 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2675                           struct timeval timeout, 
2676                           uint32_t destnode,
2677                           const char *name, uint32_t value)
2678 {
2679         struct ctdb_control_set_tunable *t;
2680         TDB_DATA data;
2681         int32_t res;
2682         int ret;
2683
2684         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2685         data.dptr  = talloc_size(ctdb, data.dsize);
2686         CTDB_NO_MEMORY(ctdb, data.dptr);
2687
2688         t = (struct ctdb_control_set_tunable *)data.dptr;
2689         t->length = strlen(name)+1;
2690         memcpy(t->name, name, t->length);
2691         t->value = value;
2692
2693         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2694                            NULL, &res, &timeout, NULL);
2695         talloc_free(data.dptr);
2696         if ((ret != 0) || (res == -1)) {
2697                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2698                 return -1;
2699         }
2700
2701         return res;
2702 }
2703
2704 /*
2705   list tunables
2706  */
2707 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2708                             struct timeval timeout, 
2709                             uint32_t destnode,
2710                             TALLOC_CTX *mem_ctx,
2711                             const char ***list, uint32_t *count)
2712 {
2713         TDB_DATA outdata;
2714         int32_t res;
2715         int ret;
2716         struct ctdb_control_list_tunable *t;
2717         char *p, *s, *ptr;
2718
2719         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2720                            mem_ctx, &outdata, &res, &timeout, NULL);
2721         if (ret != 0 || res != 0) {
2722                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2723                 return -1;
2724         }
2725
2726         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2727         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2728             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2729                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2730                 talloc_free(outdata.dptr);
2731                 return -1;              
2732         }
2733         
2734         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2735         CTDB_NO_MEMORY(ctdb, p);
2736
2737         talloc_free(outdata.dptr);
2738         
2739         (*list) = NULL;
2740         (*count) = 0;
2741
2742         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2743                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2744                 CTDB_NO_MEMORY(ctdb, *list);
2745                 (*list)[*count] = talloc_strdup(*list, s);
2746                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2747                 (*count)++;
2748         }
2749
2750         talloc_free(p);
2751
2752         return 0;
2753 }
2754
2755
2756 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2757                                    struct timeval timeout, uint32_t destnode,
2758                                    TALLOC_CTX *mem_ctx,
2759                                    uint32_t flags,
2760                                    struct ctdb_all_public_ips **ips)
2761 {
2762         int ret;
2763         TDB_DATA outdata;
2764         int32_t res;
2765
2766         ret = ctdb_control(ctdb, destnode, 0,
2767                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2768                            mem_ctx, &outdata, &res, &timeout, NULL);
2769         if (ret != 0 || res != 0) {
2770                 DEBUG(DEBUG_ERR,(__location__
2771                                  " ctdb_control for getpublicips failed ret:%d res:%d\n",
2772                                  ret, res));
2773                 return -1;
2774         }
2775
2776         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2777         talloc_free(outdata.dptr);
2778
2779         return 0;
2780 }
2781
2782 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2783                              struct timeval timeout, uint32_t destnode,
2784                              TALLOC_CTX *mem_ctx,
2785                              struct ctdb_all_public_ips **ips)
2786 {
2787         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2788                                               destnode, mem_ctx,
2789                                               0, ips);
2790 }
2791
2792 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2793                                  struct timeval timeout, uint32_t destnode,
2794                                  TALLOC_CTX *mem_ctx,
2795                                  const ctdb_sock_addr *addr,
2796                                  struct ctdb_control_public_ip_info **_info)
2797 {
2798         int ret;
2799         TDB_DATA indata;
2800         TDB_DATA outdata;
2801         int32_t res;
2802         struct ctdb_control_public_ip_info *info;
2803         uint32_t len;
2804         uint32_t i;
2805
2806         indata.dptr = discard_const_p(uint8_t, addr);
2807         indata.dsize = sizeof(*addr);
2808
2809         ret = ctdb_control(ctdb, destnode, 0,
2810                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2811                            mem_ctx, &outdata, &res, &timeout, NULL);
2812         if (ret != 0 || res != 0) {
2813                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2814                                 "failed ret:%d res:%d\n",
2815                                 ret, res));
2816                 return -1;
2817         }
2818
2819         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2820         if (len > outdata.dsize) {
2821                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2822                                 "returned invalid data with size %u > %u\n",
2823                                 (unsigned int)outdata.dsize,
2824                                 (unsigned int)len));
2825                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2826                 return -1;
2827         }
2828
2829         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2830         len += info->num*sizeof(struct ctdb_control_iface_info);
2831
2832         if (len > outdata.dsize) {
2833                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2834                                 "returned invalid data with size %u > %u\n",
2835                                 (unsigned int)outdata.dsize,
2836                                 (unsigned int)len));
2837                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2838                 return -1;
2839         }
2840
2841         /* make sure we null terminate the returned strings */
2842         for (i=0; i < info->num; i++) {
2843                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2844         }
2845
2846         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2847                                                                 outdata.dptr,
2848                                                                 outdata.dsize);
2849         talloc_free(outdata.dptr);
2850         if (*_info == NULL) {
2851                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2852                                 "talloc_memdup size %u failed\n",
2853                                 (unsigned int)outdata.dsize));
2854                 return -1;
2855         }
2856
2857         return 0;
2858 }
2859
2860 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2861                          struct timeval timeout, uint32_t destnode,
2862                          TALLOC_CTX *mem_ctx,
2863                          struct ctdb_control_get_ifaces **_ifaces)
2864 {
2865         int ret;
2866         TDB_DATA outdata;
2867         int32_t res;
2868         struct ctdb_control_get_ifaces *ifaces;
2869         uint32_t len;
2870         uint32_t i;
2871
2872         ret = ctdb_control(ctdb, destnode, 0,
2873                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2874                            mem_ctx, &outdata, &res, &timeout, NULL);
2875         if (ret != 0 || res != 0) {
2876                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2877                                 "failed ret:%d res:%d\n",
2878                                 ret, res));
2879                 return -1;
2880         }
2881
2882         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2883         if (len > outdata.dsize) {
2884                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2885                                 "returned invalid data with size %u > %u\n",
2886                                 (unsigned int)outdata.dsize,
2887                                 (unsigned int)len));
2888                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2889                 return -1;
2890         }
2891
2892         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2893         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2894
2895         if (len > outdata.dsize) {
2896                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2897                                 "returned invalid data with size %u > %u\n",
2898                                 (unsigned int)outdata.dsize,
2899                                 (unsigned int)len));
2900                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2901                 return -1;
2902         }
2903
2904         /* make sure we null terminate the returned strings */
2905         for (i=0; i < ifaces->num; i++) {
2906                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2907         }
2908
2909         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2910                                                                   outdata.dptr,
2911                                                                   outdata.dsize);
2912         talloc_free(outdata.dptr);
2913         if (*_ifaces == NULL) {
2914                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2915                                 "talloc_memdup size %u failed\n",
2916                                 (unsigned int)outdata.dsize));
2917                 return -1;
2918         }
2919
2920         return 0;
2921 }
2922
2923 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2924                              struct timeval timeout, uint32_t destnode,
2925                              TALLOC_CTX *mem_ctx,
2926                              const struct ctdb_control_iface_info *info)
2927 {
2928         int ret;
2929         TDB_DATA indata;
2930         int32_t res;
2931
2932         indata.dptr = discard_const_p(uint8_t, info);
2933         indata.dsize = sizeof(*info);
2934
2935         ret = ctdb_control(ctdb, destnode, 0,
2936                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2937                            mem_ctx, NULL, &res, &timeout, NULL);
2938         if (ret != 0 || res != 0) {
2939                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2940                                 "failed ret:%d res:%d\n",
2941                                 ret, res));
2942                 return -1;
2943         }
2944
2945         return 0;
2946 }
2947
2948 /*
2949   set/clear the permanent disabled bit on a remote node
2950  */
2951 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2952                        uint32_t set, uint32_t clear)
2953 {
2954         int ret;
2955         TDB_DATA data;
2956         struct ctdb_node_map *nodemap=NULL;
2957         struct ctdb_node_flag_change c;
2958         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2959         uint32_t recmaster;
2960         uint32_t *nodes;
2961
2962
2963         /* find the recovery master */
2964         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2965         if (ret != 0) {
2966                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2967                 talloc_free(tmp_ctx);
2968                 return ret;
2969         }
2970
2971
2972         /* read the node flags from the recmaster */
2973         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2974         if (ret != 0) {
2975                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2976                 talloc_free(tmp_ctx);
2977                 return -1;
2978         }
2979         if (destnode >= nodemap->num) {
2980                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2981                 talloc_free(tmp_ctx);
2982                 return -1;
2983         }
2984
2985         c.pnn       = destnode;
2986         c.old_flags = nodemap->nodes[destnode].flags;
2987         c.new_flags = c.old_flags;
2988         c.new_flags |= set;
2989         c.new_flags &= ~clear;
2990
2991         data.dsize = sizeof(c);
2992         data.dptr = (unsigned char *)&c;
2993
2994         /* send the flags update to all connected nodes */
2995         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2996
2997         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2998                                         nodes, 0,
2999                                         timeout, false, data,
3000                                         NULL, NULL,
3001                                         NULL) != 0) {
3002                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
3003
3004                 talloc_free(tmp_ctx);
3005                 return -1;
3006         }
3007
3008         talloc_free(tmp_ctx);
3009         return 0;
3010 }
3011
3012
3013 /*
3014   get all tunables
3015  */
3016 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
3017                                struct timeval timeout, 
3018                                uint32_t destnode,
3019                                struct ctdb_tunable *tunables)
3020 {
3021         TDB_DATA outdata;
3022         int ret;
3023         int32_t res;
3024
3025         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
3026                            &outdata, &res, &timeout, NULL);
3027         if (ret != 0 || res != 0) {
3028                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
3029                 return -1;
3030         }
3031
3032         if (outdata.dsize != sizeof(*tunables)) {
3033                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
3034                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
3035                 return -1;              
3036         }
3037
3038         *tunables = *(struct ctdb_tunable *)outdata.dptr;
3039         talloc_free(outdata.dptr);
3040         return 0;
3041 }
3042
3043 /*
3044   add a public address to a node
3045  */
3046 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
3047                       struct timeval timeout, 
3048                       uint32_t destnode,
3049                       struct ctdb_control_ip_iface *pub)
3050 {
3051         TDB_DATA data;
3052         int32_t res;
3053         int ret;
3054
3055         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3056         data.dptr  = (unsigned char *)pub;
3057
3058         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
3059                            NULL, &res, &timeout, NULL);
3060         if (ret != 0 || res != 0) {
3061                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
3062                 return -1;
3063         }
3064
3065         return 0;
3066 }
3067
3068 /*
3069   delete a public address from a node
3070  */
3071 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
3072                       struct timeval timeout, 
3073                       uint32_t destnode,
3074                       struct ctdb_control_ip_iface *pub)
3075 {
3076         TDB_DATA data;
3077         int32_t res;
3078         int ret;
3079
3080         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
3081         data.dptr  = (unsigned char *)pub;
3082
3083         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
3084                            NULL, &res, &timeout, NULL);
3085         if (ret != 0 || res != 0) {
3086                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
3087                 return -1;
3088         }
3089
3090         return 0;
3091 }
3092
3093 /*
3094   kill a tcp connection
3095  */
3096 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
3097                       struct timeval timeout, 
3098                       uint32_t destnode,
3099                       struct ctdb_tcp_connection *killtcp)
3100 {
3101         TDB_DATA data;
3102         int32_t res;
3103         int ret;
3104
3105         data.dsize = sizeof(struct ctdb_tcp_connection);
3106         data.dptr  = (unsigned char *)killtcp;
3107
3108         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
3109                            NULL, &res, &timeout, NULL);
3110         if (ret != 0 || res != 0) {
3111                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
3112                 return -1;
3113         }
3114
3115         return 0;
3116 }
3117
3118 /*
3119   send a gratious arp
3120  */
3121 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3122                       struct timeval timeout, 
3123                       uint32_t destnode,
3124                       ctdb_sock_addr *addr,
3125                       const char *ifname)
3126 {
3127         TDB_DATA data;
3128         int32_t res;
3129         int ret, len;
3130         struct ctdb_control_gratious_arp *gratious_arp;
3131         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3132
3133
3134         len = strlen(ifname)+1;
3135         gratious_arp = talloc_size(tmp_ctx, 
3136                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3137         CTDB_NO_MEMORY(ctdb, gratious_arp);
3138
3139         gratious_arp->addr = *addr;
3140         gratious_arp->len = len;
3141         memcpy(&gratious_arp->iface[0], ifname, len);
3142
3143
3144         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3145         data.dptr  = (unsigned char *)gratious_arp;
3146
3147         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3148                            NULL, &res, &timeout, NULL);
3149         if (ret != 0 || res != 0) {
3150                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3151                 talloc_free(tmp_ctx);
3152                 return -1;
3153         }
3154
3155         talloc_free(tmp_ctx);
3156         return 0;
3157 }
3158
3159 /*
3160   get a list of all tcp tickles that a node knows about for a particular vnn
3161  */
3162 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3163                               struct timeval timeout, uint32_t destnode, 
3164                               TALLOC_CTX *mem_ctx, 
3165                               ctdb_sock_addr *addr,
3166                               struct ctdb_control_tcp_tickle_list **list)
3167 {
3168         int ret;
3169         TDB_DATA data, outdata;
3170         int32_t status;
3171
3172         data.dptr = (uint8_t*)addr;
3173         data.dsize = sizeof(ctdb_sock_addr);
3174
3175         ret = ctdb_control(ctdb, destnode, 0, 
3176                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3177                            mem_ctx, &outdata, &status, NULL, NULL);
3178         if (ret != 0 || status != 0) {
3179                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3180                 return -1;
3181         }
3182
3183         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3184
3185         return status;
3186 }
3187
3188 /*
3189   register a server id
3190  */
3191 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3192                       struct timeval timeout, 
3193                       struct ctdb_server_id *id)
3194 {
3195         TDB_DATA data;
3196         int32_t res;
3197         int ret;
3198
3199         data.dsize = sizeof(struct ctdb_server_id);
3200         data.dptr  = (unsigned char *)id;
3201
3202         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3203                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3204                         0, data, NULL,
3205                         NULL, &res, &timeout, NULL);
3206         if (ret != 0 || res != 0) {
3207                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3208                 return -1;
3209         }
3210
3211         return 0;
3212 }
3213
3214 /*
3215   unregister a server id
3216  */
3217 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3218                       struct timeval timeout, 
3219                       struct ctdb_server_id *id)
3220 {
3221         TDB_DATA data;
3222         int32_t res;
3223         int ret;
3224
3225         data.dsize = sizeof(struct ctdb_server_id);
3226         data.dptr  = (unsigned char *)id;
3227
3228         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3229                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3230                         0, data, NULL,
3231                         NULL, &res, &timeout, NULL);
3232         if (ret != 0 || res != 0) {
3233                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3234                 return -1;
3235         }
3236
3237         return 0;
3238 }
3239
3240
3241 /*
3242   check if a server id exists
3243
3244   if a server id does exist, return *status == 1, otherwise *status == 0
3245  */
3246 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3247                       struct timeval timeout, 
3248                       uint32_t destnode,
3249                       struct ctdb_server_id *id,
3250                       uint32_t *status)
3251 {
3252         TDB_DATA data;
3253         int32_t res;
3254         int ret;
3255
3256         data.dsize = sizeof(struct ctdb_server_id);
3257         data.dptr  = (unsigned char *)id;
3258
3259         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3260                         0, data, NULL,
3261                         NULL, &res, &timeout, NULL);
3262         if (ret != 0) {
3263                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3264                 return -1;
3265         }
3266
3267         if (res) {
3268                 *status = 1;
3269         } else {
3270                 *status = 0;
3271         }
3272
3273         return 0;
3274 }
3275
3276 /*
3277    get the list of server ids that are registered on a node
3278 */
3279 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3280                 TALLOC_CTX *mem_ctx,
3281                 struct timeval timeout, uint32_t destnode, 
3282                 struct ctdb_server_id_list **svid_list)
3283 {
3284         int ret;
3285         TDB_DATA outdata;
3286         int32_t res;
3287
3288         ret = ctdb_control(ctdb, destnode, 0, 
3289                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3290                            mem_ctx, &outdata, &res, &timeout, NULL);
3291         if (ret != 0 || res != 0) {
3292                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3293                 return -1;
3294         }
3295
3296         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3297                     
3298         return 0;
3299 }
3300
3301 /*
3302   initialise the ctdb daemon for client applications
3303
3304   NOTE: In current code the daemon does not fork. This is for testing purposes only
3305   and to simplify the code.
3306 */
3307 struct ctdb_context *ctdb_init(struct event_context *ev)
3308 {
3309         int ret;
3310         struct ctdb_context *ctdb;
3311
3312         ctdb = talloc_zero(ev, struct ctdb_context);
3313         if (ctdb == NULL) {
3314                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3315                 return NULL;
3316         }
3317         ctdb->ev  = ev;
3318         /* Wrap early to exercise code. */
3319         ret = reqid_init(ctdb, INT_MAX-200, &ctdb->idr);
3320         if (ret != 0) {
3321                 DEBUG(DEBUG_ERR, ("reqid_init failed (%s)\n", strerror(ret)));
3322                 talloc_free(ctdb);
3323                 return NULL;
3324         }
3325
3326         ret = srvid_init(ctdb, &ctdb->srv);
3327         if (ret != 0) {
3328                 DEBUG(DEBUG_ERR, ("srvid_init failed (%s)\n", strerror(ret)));
3329                 talloc_free(ctdb);
3330                 return NULL;
3331         }
3332
3333         ret = ctdb_set_socketname(ctdb, CTDB_SOCKET);
3334         if (ret != 0) {
3335                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3336                 talloc_free(ctdb);
3337                 return NULL;
3338         }
3339
3340         ctdb->statistics.statistics_start_time = timeval_current();
3341
3342         return ctdb;
3343 }
3344
3345
3346 /*
3347   set some ctdb flags
3348 */
3349 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3350 {
3351         ctdb->flags |= flags;
3352 }
3353
3354 /*
3355   setup the local socket name
3356 */
3357 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3358 {
3359         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3360         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3361
3362         return 0;
3363 }
3364
3365 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3366 {
3367         return ctdb->daemon.name;
3368 }
3369
3370 /*
3371   return the pnn of this node
3372 */
3373 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3374 {
3375         return ctdb->pnn;
3376 }
3377
3378
3379 /*
3380   get the uptime of a remote node
3381  */
3382 struct ctdb_client_control_state *
3383 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3384 {
3385         return ctdb_control_send(ctdb, destnode, 0, 
3386                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3387                            mem_ctx, &timeout, NULL);
3388 }
3389
3390 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3391 {
3392         int ret;
3393         int32_t res;
3394         TDB_DATA outdata;
3395
3396         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3397         if (ret != 0 || res != 0) {
3398                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3399                 return -1;
3400         }
3401
3402         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3403
3404         return 0;
3405 }
3406
3407 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3408 {
3409         struct ctdb_client_control_state *state;
3410
3411         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3412         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3413 }
3414
3415 /*
3416   send a control to execute the "recovered" event script on a node
3417  */
3418 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3419 {
3420         int ret;
3421         int32_t status;
3422
3423         ret = ctdb_control(ctdb, destnode, 0, 
3424                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3425                            NULL, NULL, &status, &timeout, NULL);
3426         if (ret != 0 || status != 0) {
3427                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3428                 return -1;
3429         }
3430
3431         return 0;
3432 }
3433
3434 /* 
3435   callback for the async helpers used when sending the same control
3436   to multiple nodes in parallell.
3437 */
3438 static void async_callback(struct ctdb_client_control_state *state)
3439 {
3440         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3441         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3442         int ret;
3443         TDB_DATA outdata;
3444         int32_t res = -1;
3445         uint32_t destnode = state->c->hdr.destnode;
3446
3447         outdata.dsize = 0;
3448         outdata.dptr = NULL;
3449
3450         /* one more node has responded with recmode data */
3451         data->count--;
3452
3453         /* if we failed to push the db, then return an error and let
3454            the main loop try again.
3455         */
3456         if (state->state != CTDB_CONTROL_DONE) {
3457                 if ( !data->dont_log_errors) {
3458                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3459                 }
3460                 data->fail_count++;
3461                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3462                         res = -ETIME;
3463                 } else {
3464                         res = -1;
3465                 }
3466                 if (data->fail_callback) {
3467                         data->fail_callback(ctdb, destnode, res, outdata,
3468                                         data->callback_data);
3469                 }
3470                 return;
3471         }
3472         
3473         state->async.fn = NULL;
3474
3475         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3476         if ((ret != 0) || (res != 0)) {
3477                 if ( !data->dont_log_errors) {
3478                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3479                 }
3480                 data->fail_count++;
3481                 if (data->fail_callback) {
3482                         data->fail_callback(ctdb, destnode, res, outdata,
3483                                         data->callback_data);
3484                 }
3485         }
3486         if ((ret == 0) && (data->callback != NULL)) {
3487                 data->callback(ctdb, destnode, res, outdata,
3488                                         data->callback_data);
3489         }
3490 }
3491
3492
3493 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3494 {
3495         /* set up the callback functions */
3496         state->async.fn = async_callback;
3497         state->async.private_data = data;
3498         
3499         /* one more control to wait for to complete */
3500         data->count++;
3501 }
3502
3503
3504 /* wait for up to the maximum number of seconds allowed
3505    or until all nodes we expect a response from has replied
3506 */
3507 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3508 {
3509         while (data->count > 0) {
3510                 event_loop_once(ctdb->ev);
3511         }
3512         if (data->fail_count != 0) {
3513                 if (!data->dont_log_errors) {
3514                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3515                                  data->fail_count));
3516                 }
3517                 return -1;
3518         }
3519         return 0;
3520 }
3521
3522
3523 /* 
3524    perform a simple control on the listed nodes
3525    The control cannot return data
3526  */
3527 int ctdb_client_async_control(struct ctdb_context *ctdb,
3528                                 enum ctdb_controls opcode,
3529                                 uint32_t *nodes,
3530                                 uint64_t srvid,
3531                                 struct timeval timeout,
3532                                 bool dont_log_errors,
3533                                 TDB_DATA data,
3534                                 client_async_callback client_callback,
3535                                 client_async_callback fail_callback,
3536                                 void *callback_data)
3537 {
3538         struct client_async_data *async_data;
3539         struct ctdb_client_control_state *state;
3540         int j, num_nodes;
3541
3542         async_data = talloc_zero(ctdb, struct client_async_data);
3543         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3544         async_data->dont_log_errors = dont_log_errors;
3545         async_data->callback = client_callback;
3546         async_data->fail_callback = fail_callback;
3547         async_data->callback_data = callback_data;
3548         async_data->opcode        = opcode;
3549
3550         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3551
3552         /* loop over all nodes and send an async control to each of them */
3553         for (j=0; j<num_nodes; j++) {
3554                 uint32_t pnn = nodes[j];
3555
3556                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3557                                           0, data, async_data, &timeout, NULL);
3558                 if (state == NULL) {
3559                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3560                         talloc_free(async_data);
3561                         return -1;
3562                 }
3563                 
3564                 ctdb_client_async_add(async_data, state);
3565         }
3566
3567         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3568                 talloc_free(async_data);
3569                 return -1;
3570         }
3571
3572         talloc_free(async_data);
3573         return 0;
3574 }
3575
3576 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3577                                 struct ctdb_vnn_map *vnn_map,
3578                                 TALLOC_CTX *mem_ctx,
3579                                 bool include_self)
3580 {
3581         int i, j, num_nodes;
3582         uint32_t *nodes;
3583
3584         for (i=num_nodes=0;i<vnn_map->size;i++) {
3585                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3586                         continue;
3587                 }
3588                 num_nodes++;
3589         } 
3590
3591         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3592         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3593
3594         for (i=j=0;i<vnn_map->size;i++) {
3595                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3596                         continue;
3597                 }
3598                 nodes[j++] = vnn_map->map[i];
3599         } 
3600
3601         return nodes;
3602 }
3603
3604 /* Get list of nodes not including those with flags specified by mask.
3605  * If exclude_pnn is not -1 then exclude that pnn from the list.
3606  */
3607 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3608                         struct ctdb_node_map *node_map,
3609                         TALLOC_CTX *mem_ctx,
3610                         uint32_t mask,
3611                         int exclude_pnn)
3612 {
3613         int i, j, num_nodes;
3614         uint32_t *nodes;
3615
3616         for (i=num_nodes=0;i<node_map->num;i++) {
3617                 if (node_map->nodes[i].flags & mask) {
3618                         continue;
3619                 }
3620                 if (node_map->nodes[i].pnn == exclude_pnn) {
3621                         continue;
3622                 }
3623                 num_nodes++;
3624         } 
3625
3626         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3627         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3628
3629         for (i=j=0;i<node_map->num;i++) {
3630                 if (node_map->nodes[i].flags & mask) {
3631                         continue;
3632                 }
3633                 if (node_map->nodes[i].pnn == exclude_pnn) {
3634                         continue;
3635                 }
3636                 nodes[j++] = node_map->nodes[i].pnn;
3637         } 
3638
3639         return nodes;
3640 }
3641
3642 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3643                                 struct ctdb_node_map *node_map,
3644                                 TALLOC_CTX *mem_ctx,
3645                                 bool include_self)
3646 {
3647         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3648                              include_self ? -1 : ctdb->pnn);
3649 }
3650
3651 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3652                                 struct ctdb_node_map *node_map,
3653                                 TALLOC_CTX *mem_ctx,
3654                                 bool include_self)
3655 {
3656         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3657                              include_self ? -1 : ctdb->pnn);
3658 }
3659
3660 /* 
3661   this is used to test if a pnn lock exists and if it exists will return
3662   the number of connections that pnn has reported or -1 if that recovery
3663   daemon is not running.
3664 */
3665 int
3666 ctdb_read_pnn_lock(int fd, int32_t pnn)
3667 {
3668         struct flock lock;
3669         char c;
3670
3671         lock.l_type = F_WRLCK;
3672         lock.l_whence = SEEK_SET;
3673         lock.l_start = pnn;
3674         lock.l_len = 1;
3675         lock.l_pid = 0;
3676
3677         if (fcntl(fd, F_GETLK, &lock) != 0) {
3678                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3679                 return -1;
3680         }
3681
3682         if (lock.l_type == F_UNLCK) {
3683                 return -1;
3684         }
3685
3686         if (pread(fd, &c, 1, pnn) == -1) {
3687                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3688                 return -1;
3689         }
3690
3691         return c;
3692 }
3693
3694 /*
3695   get capabilities of a remote node
3696  */
3697 struct ctdb_client_control_state *
3698 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3699 {
3700         return ctdb_control_send(ctdb, destnode, 0, 
3701                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3702                            mem_ctx, &timeout, NULL);
3703 }
3704
3705 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3706 {
3707         int ret;
3708         int32_t res;
3709         TDB_DATA outdata;
3710
3711         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3712         if ( (ret != 0) || (res != 0) ) {
3713                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3714                 return -1;
3715         }
3716
3717         if (capabilities) {
3718                 *capabilities = *((uint32_t *)outdata.dptr);
3719         }
3720