client:ctdb_control_send: remove duplicate setting of the reqid header.
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
32
33 /*
34   allocate a packet for use in client<->daemon communication
35  */
36 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
37                                             TALLOC_CTX *mem_ctx, 
38                                             enum ctdb_operation operation, 
39                                             size_t length, size_t slength,
40                                             const char *type)
41 {
42         int size;
43         struct ctdb_req_header *hdr;
44
45         length = MAX(length, slength);
46         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
47
48         hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
49         if (hdr == NULL) {
50                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
51                          operation, (unsigned)length));
52                 return NULL;
53         }
54         talloc_set_name_const(hdr, type);
55         memset(hdr, 0, slength);
56         hdr->length       = length;
57         hdr->operation    = operation;
58         hdr->ctdb_magic   = CTDB_MAGIC;
59         hdr->ctdb_version = CTDB_VERSION;
60         hdr->srcnode      = ctdb->pnn;
61         if (ctdb->vnn_map) {
62                 hdr->generation = ctdb->vnn_map->generation;
63         }
64
65         return hdr;
66 }
67
68 /*
69   local version of ctdb_call
70 */
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73                     TDB_DATA *data, uint32_t caller)
74 {
75         struct ctdb_call_info *c;
76         struct ctdb_registered_call *fn;
77         struct ctdb_context *ctdb = ctdb_db->ctdb;
78         
79         c = talloc(ctdb, struct ctdb_call_info);
80         CTDB_NO_MEMORY(ctdb, c);
81
82         c->key = call->key;
83         c->call_data = &call->call_data;
84         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85         c->record_data.dsize = data->dsize;
86         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
87         c->new_data = NULL;
88         c->reply_data = NULL;
89         c->status = 0;
90
91         for (fn=ctdb_db->calls;fn;fn=fn->next) {
92                 if (fn->id == call->call_id) break;
93         }
94         if (fn == NULL) {
95                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
96                 talloc_free(c);
97                 return -1;
98         }
99
100         if (fn->fn(c) != 0) {
101                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
102                 talloc_free(c);
103                 return -1;
104         }
105
106         if (header->laccessor != caller) {
107                 header->lacount = 0;
108         }
109         header->laccessor = caller;
110         header->lacount++;
111
112         /* we need to force the record to be written out if this was a remote access,
113            so that the lacount is updated */
114         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
115                 c->new_data = &c->record_data;
116         }
117
118         if (c->new_data) {
119                 /* XXX check that we always have the lock here? */
120                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
121                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
122                         talloc_free(c);
123                         return -1;
124                 }
125         }
126
127         if (c->reply_data) {
128                 call->reply_data = *c->reply_data;
129
130                 talloc_steal(call, call->reply_data.dptr);
131                 talloc_set_name_const(call->reply_data.dptr, __location__);
132         } else {
133                 call->reply_data.dptr = NULL;
134                 call->reply_data.dsize = 0;
135         }
136         call->status = c->status;
137
138         talloc_free(c);
139
140         return 0;
141 }
142
143
144 /*
145   queue a packet for sending from client to daemon
146 */
147 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
148 {
149         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
150 }
151
152
153 /*
154   called when a CTDB_REPLY_CALL packet comes in in the client
155
156   This packet comes in response to a CTDB_REQ_CALL request packet. It
157   contains any reply data from the call
158 */
159 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
160 {
161         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
162         struct ctdb_client_call_state *state;
163
164         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
165         if (state == NULL) {
166                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
167                 return;
168         }
169
170         if (hdr->reqid != state->reqid) {
171                 /* we found a record  but it was the wrong one */
172                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
173                 return;
174         }
175
176         state->call->reply_data.dptr = c->data;
177         state->call->reply_data.dsize = c->datalen;
178         state->call->status = c->status;
179
180         talloc_steal(state, c);
181
182         state->state = CTDB_CALL_DONE;
183
184         if (state->async.fn) {
185                 state->async.fn(state);
186         }
187 }
188
189 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
190
191 /*
192   this is called in the client, when data comes in from the daemon
193  */
194 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
195 {
196         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
197         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
198         TALLOC_CTX *tmp_ctx;
199
200         /* place the packet as a child of a tmp_ctx. We then use
201            talloc_free() below to free it. If any of the calls want
202            to keep it, then they will steal it somewhere else, and the
203            talloc_free() will be a no-op */
204         tmp_ctx = talloc_new(ctdb);
205         talloc_steal(tmp_ctx, hdr);
206
207         if (cnt == 0) {
208                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
209                 exit(0);
210         }
211
212         if (cnt < sizeof(*hdr)) {
213                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
214                 goto done;
215         }
216         if (cnt != hdr->length) {
217                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
218                                (unsigned)hdr->length, (unsigned)cnt);
219                 goto done;
220         }
221
222         if (hdr->ctdb_magic != CTDB_MAGIC) {
223                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
224                 goto done;
225         }
226
227         if (hdr->ctdb_version != CTDB_VERSION) {
228                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
229                 goto done;
230         }
231
232         switch (hdr->operation) {
233         case CTDB_REPLY_CALL:
234                 ctdb_client_reply_call(ctdb, hdr);
235                 break;
236
237         case CTDB_REQ_MESSAGE:
238                 ctdb_request_message(ctdb, hdr);
239                 break;
240
241         case CTDB_REPLY_CONTROL:
242                 ctdb_client_reply_control(ctdb, hdr);
243                 break;
244
245         default:
246                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
247         }
248
249 done:
250         talloc_free(tmp_ctx);
251 }
252
253 /*
254   connect to a unix domain socket
255 */
256 int ctdb_socket_connect(struct ctdb_context *ctdb)
257 {
258         struct sockaddr_un addr;
259
260         memset(&addr, 0, sizeof(addr));
261         addr.sun_family = AF_UNIX;
262         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
263
264         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
265         if (ctdb->daemon.sd == -1) {
266                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
267                 return -1;
268         }
269
270         set_nonblocking(ctdb->daemon.sd);
271         set_close_on_exec(ctdb->daemon.sd);
272         
273         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
274                 close(ctdb->daemon.sd);
275                 ctdb->daemon.sd = -1;
276                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
277                 return -1;
278         }
279
280         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
281                                               CTDB_DS_ALIGNMENT, 
282                                               ctdb_client_read_cb, ctdb);
283         return 0;
284 }
285
286
287 struct ctdb_record_handle {
288         struct ctdb_db_context *ctdb_db;
289         TDB_DATA key;
290         TDB_DATA *data;
291         struct ctdb_ltdb_header header;
292 };
293
294
295 /*
296   make a recv call to the local ctdb daemon - called from client context
297
298   This is called when the program wants to wait for a ctdb_call to complete and get the 
299   results. This call will block unless the call has already completed.
300 */
301 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
302 {
303         if (state == NULL) {
304                 return -1;
305         }
306
307         while (state->state < CTDB_CALL_DONE) {
308                 event_loop_once(state->ctdb_db->ctdb->ev);
309         }
310         if (state->state != CTDB_CALL_DONE) {
311                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
312                 talloc_free(state);
313                 return -1;
314         }
315
316         if (state->call->reply_data.dsize) {
317                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
318                                                       state->call->reply_data.dptr,
319                                                       state->call->reply_data.dsize);
320                 call->reply_data.dsize = state->call->reply_data.dsize;
321         } else {
322                 call->reply_data.dptr = NULL;
323                 call->reply_data.dsize = 0;
324         }
325         call->status = state->call->status;
326         talloc_free(state);
327
328         return 0;
329 }
330
331
332
333
334 /*
335   destroy a ctdb_call in client
336 */
337 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
338 {
339         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
340         return 0;
341 }
342
343 /*
344   construct an event driven local ctdb_call
345
346   this is used so that locally processed ctdb_call requests are processed
347   in an event driven manner
348 */
349 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
350                                                                   struct ctdb_call *call,
351                                                                   struct ctdb_ltdb_header *header,
352                                                                   TDB_DATA *data)
353 {
354         struct ctdb_client_call_state *state;
355         struct ctdb_context *ctdb = ctdb_db->ctdb;
356         int ret;
357
358         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
359         CTDB_NO_MEMORY_NULL(ctdb, state);
360         state->call = talloc_zero(state, struct ctdb_call);
361         CTDB_NO_MEMORY_NULL(ctdb, state->call);
362
363         talloc_steal(state, data->dptr);
364
365         state->state   = CTDB_CALL_DONE;
366         *(state->call) = *call;
367         state->ctdb_db = ctdb_db;
368
369         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, ctdb->pnn);
370
371         return state;
372 }
373
374 /*
375   make a ctdb call to the local daemon - async send. Called from client context.
376
377   This constructs a ctdb_call request and queues it for processing. 
378   This call never blocks.
379 */
380 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
381                                               struct ctdb_call *call)
382 {
383         struct ctdb_client_call_state *state;
384         struct ctdb_context *ctdb = ctdb_db->ctdb;
385         struct ctdb_ltdb_header header;
386         TDB_DATA data;
387         int ret;
388         size_t len;
389         struct ctdb_req_call *c;
390
391         /* if the domain socket is not yet open, open it */
392         if (ctdb->daemon.sd==-1) {
393                 ctdb_socket_connect(ctdb);
394         }
395
396         ret = ctdb_ltdb_lock(ctdb_db, call->key);
397         if (ret != 0) {
398                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
399                 return NULL;
400         }
401
402         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
403
404         if (ret == 0 && header.dmaster == ctdb->pnn) {
405                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
406                 talloc_free(data.dptr);
407                 ctdb_ltdb_unlock(ctdb_db, call->key);
408                 return state;
409         }
410
411         ctdb_ltdb_unlock(ctdb_db, call->key);
412         talloc_free(data.dptr);
413
414         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
415         if (state == NULL) {
416                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
417                 return NULL;
418         }
419         state->call = talloc_zero(state, struct ctdb_call);
420         if (state->call == NULL) {
421                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
422                 return NULL;
423         }
424
425         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
426         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
427         if (c == NULL) {
428                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
429                 return NULL;
430         }
431
432         state->reqid     = ctdb_reqid_new(ctdb, state);
433         state->ctdb_db = ctdb_db;
434         talloc_set_destructor(state, ctdb_client_call_destructor);
435
436         c->hdr.reqid     = state->reqid;
437         c->flags         = call->flags;
438         c->db_id         = ctdb_db->db_id;
439         c->callid        = call->call_id;
440         c->hopcount      = 0;
441         c->keylen        = call->key.dsize;
442         c->calldatalen   = call->call_data.dsize;
443         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
444         memcpy(&c->data[call->key.dsize], 
445                call->call_data.dptr, call->call_data.dsize);
446         *(state->call)              = *call;
447         state->call->call_data.dptr = &c->data[call->key.dsize];
448         state->call->key.dptr       = &c->data[0];
449
450         state->state  = CTDB_CALL_WAIT;
451
452
453         ctdb_client_queue_pkt(ctdb, &c->hdr);
454
455         return state;
456 }
457
458
459 /*
460   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
461 */
462 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
463 {
464         struct ctdb_client_call_state *state;
465
466         state = ctdb_call_send(ctdb_db, call);
467         return ctdb_call_recv(state, call);
468 }
469
470
471 /*
472   tell the daemon what messaging srvid we will use, and register the message
473   handler function in the client
474 */
475 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
476                              ctdb_message_fn_t handler,
477                              void *private_data)
478                                     
479 {
480         int res;
481         int32_t status;
482         
483         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
484                            tdb_null, NULL, NULL, &status, NULL, NULL);
485         if (res != 0 || status != 0) {
486                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
487                 return -1;
488         }
489
490         /* also need to register the handler with our own ctdb structure */
491         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
492 }
493
494 /*
495   tell the daemon we no longer want a srvid
496 */
497 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
498 {
499         int res;
500         int32_t status;
501         
502         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
503                            tdb_null, NULL, NULL, &status, NULL, NULL);
504         if (res != 0 || status != 0) {
505                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
506                 return -1;
507         }
508
509         /* also need to register the handler with our own ctdb structure */
510         ctdb_deregister_message_handler(ctdb, srvid, private_data);
511         return 0;
512 }
513
514
515 /*
516   send a message - from client context
517  */
518 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t pnn,
519                       uint64_t srvid, TDB_DATA data)
520 {
521         struct ctdb_req_message *r;
522         int len, res;
523
524         len = offsetof(struct ctdb_req_message, data) + data.dsize;
525         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
526                                len, struct ctdb_req_message);
527         CTDB_NO_MEMORY(ctdb, r);
528
529         r->hdr.destnode  = pnn;
530         r->srvid         = srvid;
531         r->datalen       = data.dsize;
532         memcpy(&r->data[0], data.dptr, data.dsize);
533         
534         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
535         if (res != 0) {
536                 return res;
537         }
538
539         talloc_free(r);
540         return 0;
541 }
542
543
544 /*
545   cancel a ctdb_fetch_lock operation, releasing the lock
546  */
547 static int fetch_lock_destructor(struct ctdb_record_handle *h)
548 {
549         ctdb_ltdb_unlock(h->ctdb_db, h->key);
550         return 0;
551 }
552
553 /*
554   force the migration of a record to this node
555  */
556 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
557 {
558         struct ctdb_call call;
559         ZERO_STRUCT(call);
560         call.call_id = CTDB_NULL_FUNC;
561         call.key = key;
562         call.flags = CTDB_IMMEDIATE_MIGRATION;
563         return ctdb_call(ctdb_db, &call);
564 }
565
566 /*
567   get a lock on a record, and return the records data. Blocks until it gets the lock
568  */
569 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
570                                            TDB_DATA key, TDB_DATA *data)
571 {
572         int ret;
573         struct ctdb_record_handle *h;
574
575         /*
576           procedure is as follows:
577
578           1) get the chain lock. 
579           2) check if we are dmaster
580           3) if we are the dmaster then return handle 
581           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
582              reply from ctdbd
583           5) when we get the reply, goto (1)
584          */
585
586         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
587         if (h == NULL) {
588                 return NULL;
589         }
590
591         h->ctdb_db = ctdb_db;
592         h->key     = key;
593         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
594         if (h->key.dptr == NULL) {
595                 talloc_free(h);
596                 return NULL;
597         }
598         h->data    = data;
599
600         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
601                  (const char *)key.dptr));
602
603 again:
604         /* step 1 - get the chain lock */
605         ret = ctdb_ltdb_lock(ctdb_db, key);
606         if (ret != 0) {
607                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
608                 talloc_free(h);
609                 return NULL;
610         }
611
612         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
613
614         talloc_set_destructor(h, fetch_lock_destructor);
615
616         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
617
618         /* when torturing, ensure we test the remote path */
619         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
620             random() % 5 == 0) {
621                 h->header.dmaster = (uint32_t)-1;
622         }
623
624
625         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
626
627         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
628                 ctdb_ltdb_unlock(ctdb_db, key);
629                 ret = ctdb_client_force_migration(ctdb_db, key);
630                 if (ret != 0) {
631                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
632                         talloc_free(h);
633                         return NULL;
634                 }
635                 goto again;
636         }
637
638         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
639         return h;
640 }
641
642 /*
643   store some data to the record that was locked with ctdb_fetch_lock()
644 */
645 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
646 {
647         if (h->ctdb_db->persistent) {
648                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
649                 return -1;
650         }
651
652         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
653 }
654
655 /*
656   non-locking fetch of a record
657  */
658 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
659                TDB_DATA key, TDB_DATA *data)
660 {
661         struct ctdb_call call;
662         int ret;
663
664         call.call_id = CTDB_FETCH_FUNC;
665         call.call_data.dptr = NULL;
666         call.call_data.dsize = 0;
667
668         ret = ctdb_call(ctdb_db, &call);
669
670         if (ret == 0) {
671                 *data = call.reply_data;
672                 talloc_steal(mem_ctx, data->dptr);
673         }
674
675         return ret;
676 }
677
678
679
680 /*
681    called when a control completes or timesout to invoke the callback
682    function the user provided
683 */
684 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
685         struct timeval t, void *private_data)
686 {
687         struct ctdb_client_control_state *state;
688         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
689         int ret;
690
691         state = talloc_get_type(private_data, struct ctdb_client_control_state);
692         talloc_steal(tmp_ctx, state);
693
694         ret = ctdb_control_recv(state->ctdb, state, state,
695                         NULL, 
696                         NULL, 
697                         NULL);
698
699         talloc_free(tmp_ctx);
700 }
701
702 /*
703   called when a CTDB_REPLY_CONTROL packet comes in in the client
704
705   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
706   contains any reply data from the control
707 */
708 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
709                                       struct ctdb_req_header *hdr)
710 {
711         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
712         struct ctdb_client_control_state *state;
713
714         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
715         if (state == NULL) {
716                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
717                 return;
718         }
719
720         if (hdr->reqid != state->reqid) {
721                 /* we found a record  but it was the wrong one */
722                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
723                 return;
724         }
725
726         state->outdata.dptr = c->data;
727         state->outdata.dsize = c->datalen;
728         state->status = c->status;
729         if (c->errorlen) {
730                 state->errormsg = talloc_strndup(state, 
731                                                  (char *)&c->data[c->datalen], 
732                                                  c->errorlen);
733         }
734
735         /* state->outdata now uses resources from c so we dont want c
736            to just dissappear from under us while state is still alive
737         */
738         talloc_steal(state, c);
739
740         state->state = CTDB_CONTROL_DONE;
741
742         /* if we had a callback registered for this control, pull the response
743            and call the callback.
744         */
745         if (state->async.fn) {
746                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
747         }
748 }
749
750
751 /*
752   destroy a ctdb_control in client
753 */
754 static int ctdb_control_destructor(struct ctdb_client_control_state *state)     
755 {
756         ctdb_reqid_remove(state->ctdb, state->reqid);
757         return 0;
758 }
759
760
761 /* time out handler for ctdb_control */
762 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
763         struct timeval t, void *private_data)
764 {
765         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
766
767         DEBUG(DEBUG_ERR,("control timed out. reqid:%d opcode:%d dstnode:%d\n", state->reqid, state->c->opcode, state->c->hdr.destnode));
768
769         state->state = CTDB_CONTROL_TIMEOUT;
770
771         /* if we had a callback registered for this control, pull the response
772            and call the callback.
773         */
774         if (state->async.fn) {
775                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
776         }
777 }
778
779 /* async version of send control request */
780 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
781                 uint32_t destnode, uint64_t srvid, 
782                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
783                 TALLOC_CTX *mem_ctx,
784                 struct timeval *timeout,
785                 char **errormsg)
786 {
787         struct ctdb_client_control_state *state;
788         size_t len;
789         struct ctdb_req_control *c;
790         int ret;
791
792         if (errormsg) {
793                 *errormsg = NULL;
794         }
795
796         /* if the domain socket is not yet open, open it */
797         if (ctdb->daemon.sd==-1) {
798                 ctdb_socket_connect(ctdb);
799         }
800
801         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
802         CTDB_NO_MEMORY_NULL(ctdb, state);
803
804         state->ctdb       = ctdb;
805         state->reqid      = ctdb_reqid_new(ctdb, state);
806         state->state      = CTDB_CONTROL_WAIT;
807         state->errormsg   = NULL;
808
809         talloc_set_destructor(state, ctdb_control_destructor);
810
811         len = offsetof(struct ctdb_req_control, data) + data.dsize;
812         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
813                                len, struct ctdb_req_control);
814         state->c            = c;        
815         CTDB_NO_MEMORY_NULL(ctdb, c);
816         c->hdr.reqid        = state->reqid;
817         c->hdr.destnode     = destnode;
818         c->opcode           = opcode;
819         c->client_id        = 0;
820         c->flags            = flags;
821         c->srvid            = srvid;
822         c->datalen          = data.dsize;
823         if (data.dsize) {
824                 memcpy(&c->data[0], data.dptr, data.dsize);
825         }
826
827         /* timeout */
828         if (timeout && !timeval_is_zero(timeout)) {
829                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
830         }
831
832         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
833         if (ret != 0) {
834                 talloc_free(state);
835                 return NULL;
836         }
837
838         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
839                 talloc_free(state);
840                 return NULL;
841         }
842
843         return state;
844 }
845
846
847 /* async version of receive control reply */
848 int ctdb_control_recv(struct ctdb_context *ctdb, 
849                 struct ctdb_client_control_state *state, 
850                 TALLOC_CTX *mem_ctx,
851                 TDB_DATA *outdata, int32_t *status, char **errormsg)
852 {
853         TALLOC_CTX *tmp_ctx;
854
855         if (status != NULL) {
856                 *status = -1;
857         }
858         if (errormsg != NULL) {
859                 *errormsg = NULL;
860         }
861
862         if (state == NULL) {
863                 return -1;
864         }
865
866         /* prevent double free of state */
867         tmp_ctx = talloc_new(ctdb);
868         talloc_steal(tmp_ctx, state);
869
870         /* loop one event at a time until we either timeout or the control
871            completes.
872         */
873         while (state->state == CTDB_CONTROL_WAIT) {
874                 event_loop_once(ctdb->ev);
875         }
876
877         if (state->state != CTDB_CONTROL_DONE) {
878                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
879                 if (state->async.fn) {
880                         state->async.fn(state);
881                 }
882                 talloc_free(tmp_ctx);
883                 return -1;
884         }
885
886         if (state->errormsg) {
887                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
888                 if (errormsg) {
889                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
890                 }
891                 if (state->async.fn) {
892                         state->async.fn(state);
893                 }
894                 talloc_free(tmp_ctx);
895                 return -1;
896         }
897
898         if (outdata) {
899                 *outdata = state->outdata;
900                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
901         }
902
903         if (status) {
904                 *status = state->status;
905         }
906
907         if (state->async.fn) {
908                 state->async.fn(state);
909         }
910
911         talloc_free(tmp_ctx);
912         return 0;
913 }
914
915
916
917 /*
918   send a ctdb control message
919   timeout specifies how long we should wait for a reply.
920   if timeout is NULL we wait indefinitely
921  */
922 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
923                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
924                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
925                  struct timeval *timeout,
926                  char **errormsg)
927 {
928         struct ctdb_client_control_state *state;
929
930         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
931                         flags, data, mem_ctx,
932                         timeout, errormsg);
933         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
934                         errormsg);
935 }
936
937
938
939
940 /*
941   a process exists call. Returns 0 if process exists, -1 otherwise
942  */
943 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
944 {
945         int ret;
946         TDB_DATA data;
947         int32_t status;
948
949         data.dptr = (uint8_t*)&pid;
950         data.dsize = sizeof(pid);
951
952         ret = ctdb_control(ctdb, destnode, 0, 
953                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
954                            NULL, NULL, &status, NULL, NULL);
955         if (ret != 0) {
956                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
957                 return -1;
958         }
959
960         return status;
961 }
962
963 /*
964   get remote statistics
965  */
966 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
967 {
968         int ret;
969         TDB_DATA data;
970         int32_t res;
971
972         ret = ctdb_control(ctdb, destnode, 0, 
973                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
974                            ctdb, &data, &res, NULL, NULL);
975         if (ret != 0 || res != 0) {
976                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
977                 return -1;
978         }
979
980         if (data.dsize != sizeof(struct ctdb_statistics)) {
981                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
982                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
983                       return -1;
984         }
985
986         *status = *(struct ctdb_statistics *)data.dptr;
987         talloc_free(data.dptr);
988                         
989         return 0;
990 }
991
992 /*
993   shutdown a remote ctdb node
994  */
995 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
996 {
997         struct ctdb_client_control_state *state;
998
999         state = ctdb_control_send(ctdb, destnode, 0, 
1000                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1001                            NULL, &timeout, NULL);
1002         if (state == NULL) {
1003                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1004                 return -1;
1005         }
1006
1007         return 0;
1008 }
1009
1010 /*
1011   get vnn map from a remote node
1012  */
1013 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1014 {
1015         int ret;
1016         TDB_DATA outdata;
1017         int32_t res;
1018         struct ctdb_vnn_map_wire *map;
1019
1020         ret = ctdb_control(ctdb, destnode, 0, 
1021                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1022                            mem_ctx, &outdata, &res, &timeout, NULL);
1023         if (ret != 0 || res != 0) {
1024                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1025                 return -1;
1026         }
1027         
1028         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1029         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1030             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1031                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1032                 return -1;
1033         }
1034
1035         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1036         CTDB_NO_MEMORY(ctdb, *vnnmap);
1037         (*vnnmap)->generation = map->generation;
1038         (*vnnmap)->size       = map->size;
1039         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1040
1041         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1042         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1043         talloc_free(outdata.dptr);
1044                     
1045         return 0;
1046 }
1047
1048
1049 /*
1050   get the recovery mode of a remote node
1051  */
1052 struct ctdb_client_control_state *
1053 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1054 {
1055         return ctdb_control_send(ctdb, destnode, 0, 
1056                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1057                            mem_ctx, &timeout, NULL);
1058 }
1059
1060 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1061 {
1062         int ret;
1063         int32_t res;
1064
1065         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1066         if (ret != 0) {
1067                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1068                 return -1;
1069         }
1070
1071         if (recmode) {
1072                 *recmode = (uint32_t)res;
1073         }
1074
1075         return 0;
1076 }
1077
1078 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1079 {
1080         struct ctdb_client_control_state *state;
1081
1082         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1083         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1084 }
1085
1086
1087
1088
1089 /*
1090   set the recovery mode of a remote node
1091  */
1092 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1093 {
1094         int ret;
1095         TDB_DATA data;
1096         int32_t res;
1097
1098         data.dsize = sizeof(uint32_t);
1099         data.dptr = (unsigned char *)&recmode;
1100
1101         ret = ctdb_control(ctdb, destnode, 0, 
1102                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1103                            NULL, NULL, &res, &timeout, NULL);
1104         if (ret != 0 || res != 0) {
1105                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1106                 return -1;
1107         }
1108
1109         return 0;
1110 }
1111
1112
1113
1114 /*
1115   get the recovery master of a remote node
1116  */
1117 struct ctdb_client_control_state *
1118 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1119                         struct timeval timeout, uint32_t destnode)
1120 {
1121         return ctdb_control_send(ctdb, destnode, 0, 
1122                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1123                            mem_ctx, &timeout, NULL);
1124 }
1125
1126 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1127 {
1128         int ret;
1129         int32_t res;
1130
1131         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1132         if (ret != 0) {
1133                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1134                 return -1;
1135         }
1136
1137         if (recmaster) {
1138                 *recmaster = (uint32_t)res;
1139         }
1140
1141         return 0;
1142 }
1143
1144 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1145 {
1146         struct ctdb_client_control_state *state;
1147
1148         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1149         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1150 }
1151
1152
1153 /*
1154   set the recovery master of a remote node
1155  */
1156 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1157 {
1158         int ret;
1159         TDB_DATA data;
1160         int32_t res;
1161
1162         ZERO_STRUCT(data);
1163         data.dsize = sizeof(uint32_t);
1164         data.dptr = (unsigned char *)&recmaster;
1165
1166         ret = ctdb_control(ctdb, destnode, 0, 
1167                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1168                            NULL, NULL, &res, &timeout, NULL);
1169         if (ret != 0 || res != 0) {
1170                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1171                 return -1;
1172         }
1173
1174         return 0;
1175 }
1176
1177
1178 /*
1179   get a list of databases off a remote node
1180  */
1181 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1182                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1183 {
1184         int ret;
1185         TDB_DATA outdata;
1186         int32_t res;
1187
1188         ret = ctdb_control(ctdb, destnode, 0, 
1189                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1190                            mem_ctx, &outdata, &res, &timeout, NULL);
1191         if (ret != 0 || res != 0) {
1192                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1193                 return -1;
1194         }
1195
1196         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1197         talloc_free(outdata.dptr);
1198                     
1199         return 0;
1200 }
1201
1202 /*
1203   get a list of nodes (vnn and flags ) from a remote node
1204  */
1205 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1206                 struct timeval timeout, uint32_t destnode, 
1207                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1208 {
1209         int ret;
1210         TDB_DATA outdata;
1211         int32_t res;
1212
1213         ret = ctdb_control(ctdb, destnode, 0, 
1214                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1215                            mem_ctx, &outdata, &res, &timeout, NULL);
1216         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1217                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1218                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1219         }
1220         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1221                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1222                 return -1;
1223         }
1224
1225         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1226         talloc_free(outdata.dptr);
1227                     
1228         return 0;
1229 }
1230
1231 /*
1232   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1233  */
1234 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1235                 struct timeval timeout, uint32_t destnode, 
1236                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1237 {
1238         int ret, i, len;
1239         TDB_DATA outdata;
1240         struct ctdb_node_mapv4 *nodemapv4;
1241         int32_t res;
1242
1243         ret = ctdb_control(ctdb, destnode, 0, 
1244                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1245                            mem_ctx, &outdata, &res, &timeout, NULL);
1246         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1247                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1248                 return -1;
1249         }
1250
1251         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1252
1253         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1254         (*nodemap) = talloc_zero_size(mem_ctx, len);
1255         CTDB_NO_MEMORY(ctdb, (*nodemap));
1256
1257         (*nodemap)->num = nodemapv4->num;
1258         for (i=0; i<nodemapv4->num; i++) {
1259                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1260                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1261                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1262                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1263         }
1264                 
1265         talloc_free(outdata.dptr);
1266                     
1267         return 0;
1268 }
1269
1270 /*
1271   drop the transport, reload the nodes file and restart the transport
1272  */
1273 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1274                     struct timeval timeout, uint32_t destnode)
1275 {
1276         int ret;
1277         int32_t res;
1278
1279         ret = ctdb_control(ctdb, destnode, 0, 
1280                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1281                            NULL, NULL, &res, &timeout, NULL);
1282         if (ret != 0 || res != 0) {
1283                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1284                 return -1;
1285         }
1286
1287         return 0;
1288 }
1289
1290
1291 /*
1292   set vnn map on a node
1293  */
1294 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1295                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1296 {
1297         int ret;
1298         TDB_DATA data;
1299         int32_t res;
1300         struct ctdb_vnn_map_wire *map;
1301         size_t len;
1302
1303         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1304         map = talloc_size(mem_ctx, len);
1305         CTDB_NO_MEMORY(ctdb, map);
1306
1307         map->generation = vnnmap->generation;
1308         map->size = vnnmap->size;
1309         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1310         
1311         data.dsize = len;
1312         data.dptr  = (uint8_t *)map;
1313
1314         ret = ctdb_control(ctdb, destnode, 0, 
1315                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1316                            NULL, NULL, &res, &timeout, NULL);
1317         if (ret != 0 || res != 0) {
1318                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1319                 return -1;
1320         }
1321
1322         talloc_free(map);
1323
1324         return 0;
1325 }
1326
1327
1328 /*
1329   async send for pull database
1330  */
1331 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1332         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1333         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1334 {
1335         TDB_DATA indata;
1336         struct ctdb_control_pulldb *pull;
1337         struct ctdb_client_control_state *state;
1338
1339         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1340         CTDB_NO_MEMORY_NULL(ctdb, pull);
1341
1342         pull->db_id   = dbid;
1343         pull->lmaster = lmaster;
1344
1345         indata.dsize = sizeof(struct ctdb_control_pulldb);
1346         indata.dptr  = (unsigned char *)pull;
1347
1348         state = ctdb_control_send(ctdb, destnode, 0, 
1349                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1350                                   mem_ctx, &timeout, NULL);
1351         talloc_free(pull);
1352
1353         return state;
1354 }
1355
1356 /*
1357   async recv for pull database
1358  */
1359 int ctdb_ctrl_pulldb_recv(
1360         struct ctdb_context *ctdb, 
1361         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1362         TDB_DATA *outdata)
1363 {
1364         int ret;
1365         int32_t res;
1366
1367         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1368         if ( (ret != 0) || (res != 0) ){
1369                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1370                 return -1;
1371         }
1372
1373         return 0;
1374 }
1375
1376 /*
1377   pull all keys and records for a specific database on a node
1378  */
1379 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1380                 uint32_t dbid, uint32_t lmaster, 
1381                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1382                 TDB_DATA *outdata)
1383 {
1384         struct ctdb_client_control_state *state;
1385
1386         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1387                                       timeout);
1388         
1389         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1390 }
1391
1392
1393 /*
1394   change dmaster for all keys in the database to the new value
1395  */
1396 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1397                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1398 {
1399         int ret;
1400         TDB_DATA indata;
1401         int32_t res;
1402
1403         indata.dsize = 2*sizeof(uint32_t);
1404         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1405
1406         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1407         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1408
1409         ret = ctdb_control(ctdb, destnode, 0, 
1410                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1411                            NULL, NULL, &res, &timeout, NULL);
1412         if (ret != 0 || res != 0) {
1413                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1414                 return -1;
1415         }
1416
1417         return 0;
1418 }
1419
1420 /*
1421   ping a node, return number of clients connected
1422  */
1423 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1424 {
1425         int ret;
1426         int32_t res;
1427
1428         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1429                            tdb_null, NULL, NULL, &res, NULL, NULL);
1430         if (ret != 0) {
1431                 return -1;
1432         }
1433         return res;
1434 }
1435
1436 /*
1437   find the real path to a ltdb 
1438  */
1439 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1440                    const char **path)
1441 {
1442         int ret;
1443         int32_t res;
1444         TDB_DATA data;
1445
1446         data.dptr = (uint8_t *)&dbid;
1447         data.dsize = sizeof(dbid);
1448
1449         ret = ctdb_control(ctdb, destnode, 0, 
1450                            CTDB_CONTROL_GETDBPATH, 0, data, 
1451                            mem_ctx, &data, &res, &timeout, NULL);
1452         if (ret != 0 || res != 0) {
1453                 return -1;
1454         }
1455
1456         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1457         if ((*path) == NULL) {
1458                 return -1;
1459         }
1460
1461         talloc_free(data.dptr);
1462
1463         return 0;
1464 }
1465
1466 /*
1467   find the name of a db 
1468  */
1469 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1470                    const char **name)
1471 {
1472         int ret;
1473         int32_t res;
1474         TDB_DATA data;
1475
1476         data.dptr = (uint8_t *)&dbid;
1477         data.dsize = sizeof(dbid);
1478
1479         ret = ctdb_control(ctdb, destnode, 0, 
1480                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1481                            mem_ctx, &data, &res, &timeout, NULL);
1482         if (ret != 0 || res != 0) {
1483                 return -1;
1484         }
1485
1486         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1487         if ((*name) == NULL) {
1488                 return -1;
1489         }
1490
1491         talloc_free(data.dptr);
1492
1493         return 0;
1494 }
1495
1496 /*
1497   create a database
1498  */
1499 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1500                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1501 {
1502         int ret;
1503         int32_t res;
1504         TDB_DATA data;
1505
1506         data.dptr = discard_const(name);
1507         data.dsize = strlen(name)+1;
1508
1509         ret = ctdb_control(ctdb, destnode, 0, 
1510                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1511                            0, data, 
1512                            mem_ctx, &data, &res, &timeout, NULL);
1513
1514         if (ret != 0 || res != 0) {
1515                 return -1;
1516         }
1517
1518         return 0;
1519 }
1520
1521 /*
1522   get debug level on a node
1523  */
1524 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1525 {
1526         int ret;
1527         int32_t res;
1528         TDB_DATA data;
1529
1530         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1531                            ctdb, &data, &res, NULL, NULL);
1532         if (ret != 0 || res != 0) {
1533                 return -1;
1534         }
1535         if (data.dsize != sizeof(int32_t)) {
1536                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1537                          (unsigned)data.dsize));
1538                 return -1;
1539         }
1540         *level = *(int32_t *)data.dptr;
1541         talloc_free(data.dptr);
1542         return 0;
1543 }
1544
1545 /*
1546   set debug level on a node
1547  */
1548 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1549 {
1550         int ret;
1551         int32_t res;
1552         TDB_DATA data;
1553
1554         data.dptr = (uint8_t *)&level;
1555         data.dsize = sizeof(level);
1556
1557         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1558                            NULL, NULL, &res, NULL, NULL);
1559         if (ret != 0 || res != 0) {
1560                 return -1;
1561         }
1562         return 0;
1563 }
1564
1565
1566 /*
1567   get a list of connected nodes
1568  */
1569 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1570                                 struct timeval timeout,
1571                                 TALLOC_CTX *mem_ctx,
1572                                 uint32_t *num_nodes)
1573 {
1574         struct ctdb_node_map *map=NULL;
1575         int ret, i;
1576         uint32_t *nodes;
1577
1578         *num_nodes = 0;
1579
1580         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1581         if (ret != 0) {
1582                 return NULL;
1583         }
1584
1585         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1586         if (nodes == NULL) {
1587                 return NULL;
1588         }
1589
1590         for (i=0;i<map->num;i++) {
1591                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1592                         nodes[*num_nodes] = map->nodes[i].pnn;
1593                         (*num_nodes)++;
1594                 }
1595         }
1596
1597         return nodes;
1598 }
1599
1600
1601 /*
1602   reset remote status
1603  */
1604 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1605 {
1606         int ret;
1607         int32_t res;
1608
1609         ret = ctdb_control(ctdb, destnode, 0, 
1610                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1611                            NULL, NULL, &res, NULL, NULL);
1612         if (ret != 0 || res != 0) {
1613                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1614                 return -1;
1615         }
1616         return 0;
1617 }
1618
1619 /*
1620   this is the dummy null procedure that all databases support
1621 */
1622 static int ctdb_null_func(struct ctdb_call_info *call)
1623 {
1624         return 0;
1625 }
1626
1627 /*
1628   this is a plain fetch procedure that all databases support
1629 */
1630 static int ctdb_fetch_func(struct ctdb_call_info *call)
1631 {
1632         call->reply_data = &call->record_data;
1633         return 0;
1634 }
1635
1636 /*
1637   attach to a specific database - client call
1638 */
1639 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1640 {
1641         struct ctdb_db_context *ctdb_db;
1642         TDB_DATA data;
1643         int ret;
1644         int32_t res;
1645
1646         ctdb_db = ctdb_db_handle(ctdb, name);
1647         if (ctdb_db) {
1648                 return ctdb_db;
1649         }
1650
1651         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1652         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1653
1654         ctdb_db->ctdb = ctdb;
1655         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1656         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1657
1658         data.dptr = discard_const(name);
1659         data.dsize = strlen(name)+1;
1660
1661         /* tell ctdb daemon to attach */
1662         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1663                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1664                            0, data, ctdb_db, &data, &res, NULL, NULL);
1665         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1666                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1667                 talloc_free(ctdb_db);
1668                 return NULL;
1669         }
1670         
1671         ctdb_db->db_id = *(uint32_t *)data.dptr;
1672         talloc_free(data.dptr);
1673
1674         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1675         if (ret != 0) {
1676                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1677                 talloc_free(ctdb_db);
1678                 return NULL;
1679         }
1680
1681         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1682         if (!ctdb->do_setsched) {
1683                 tdb_flags |= TDB_NOMMAP;
1684         }
1685
1686         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1687         if (ctdb_db->ltdb == NULL) {
1688                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1689                 talloc_free(ctdb_db);
1690                 return NULL;
1691         }
1692
1693         ctdb_db->persistent = persistent;
1694
1695         DLIST_ADD(ctdb->db_list, ctdb_db);
1696
1697         /* add well known functions */
1698         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1699         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1700
1701         return ctdb_db;
1702 }
1703
1704
1705 /*
1706   setup a call for a database
1707  */
1708 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1709 {
1710         struct ctdb_registered_call *call;
1711
1712 #if 0
1713         TDB_DATA data;
1714         int32_t status;
1715         struct ctdb_control_set_call c;
1716         int ret;
1717
1718         /* this is no longer valid with the separate daemon architecture */
1719         c.db_id = ctdb_db->db_id;
1720         c.fn    = fn;
1721         c.id    = id;
1722
1723         data.dptr = (uint8_t *)&c;
1724         data.dsize = sizeof(c);
1725
1726         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1727                            data, NULL, NULL, &status, NULL, NULL);
1728         if (ret != 0 || status != 0) {
1729                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1730                 return -1;
1731         }
1732 #endif
1733
1734         /* also register locally */
1735         call = talloc(ctdb_db, struct ctdb_registered_call);
1736         call->fn = fn;
1737         call->id = id;
1738
1739         DLIST_ADD(ctdb_db->calls, call);        
1740         return 0;
1741 }
1742
1743
1744 struct traverse_state {
1745         bool done;
1746         uint32_t count;
1747         ctdb_traverse_func fn;
1748         void *private_data;
1749 };
1750
1751 /*
1752   called on each key during a ctdb_traverse
1753  */
1754 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1755 {
1756         struct traverse_state *state = (struct traverse_state *)p;
1757         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1758         TDB_DATA key;
1759
1760         if (data.dsize < sizeof(uint32_t) ||
1761             d->length != data.dsize) {
1762                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1763                 state->done = True;
1764                 return;
1765         }
1766
1767         key.dsize = d->keylen;
1768         key.dptr  = &d->data[0];
1769         data.dsize = d->datalen;
1770         data.dptr = &d->data[d->keylen];
1771
1772         if (key.dsize == 0 && data.dsize == 0) {
1773                 /* end of traverse */
1774                 state->done = True;
1775                 return;
1776         }
1777
1778         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1779                 /* empty records are deleted records in ctdb */
1780                 return;
1781         }
1782
1783         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1784                 state->done = True;
1785         }
1786
1787         state->count++;
1788 }
1789
1790
1791 /*
1792   start a cluster wide traverse, calling the supplied fn on each record
1793   return the number of records traversed, or -1 on error
1794  */
1795 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1796 {
1797         TDB_DATA data;
1798         struct ctdb_traverse_start t;
1799         int32_t status;
1800         int ret;
1801         uint64_t srvid = (getpid() | 0xFLL<<60);
1802         struct traverse_state state;
1803
1804         state.done = False;
1805         state.count = 0;
1806         state.private_data = private_data;
1807         state.fn = fn;
1808
1809         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1810         if (ret != 0) {
1811                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1812                 return -1;
1813         }
1814
1815         t.db_id = ctdb_db->db_id;
1816         t.srvid = srvid;
1817         t.reqid = 0;
1818
1819         data.dptr = (uint8_t *)&t;
1820         data.dsize = sizeof(t);
1821
1822         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1823                            data, NULL, NULL, &status, NULL, NULL);
1824         if (ret != 0 || status != 0) {
1825                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1826                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1827                 return -1;
1828         }
1829
1830         while (!state.done) {
1831                 event_loop_once(ctdb_db->ctdb->ev);
1832         }
1833
1834         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1835         if (ret != 0) {
1836                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1837                 return -1;
1838         }
1839
1840         return state.count;
1841 }
1842
1843 #define ISASCII(x) ((x>31)&&(x<128))
1844 /*
1845   called on each key during a catdb
1846  */
1847 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1848 {
1849         int i;
1850         FILE *f = (FILE *)p;
1851         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1852
1853         fprintf(f, "dmaster: %u\n", h->dmaster);
1854         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1855
1856         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1857         for (i=0;i<key.dsize;i++) {
1858                 if (ISASCII(key.dptr[i])) {
1859                         fprintf(f, "%c", key.dptr[i]);
1860                 } else {
1861                         fprintf(f, "\\%02X", key.dptr[i]);
1862                 }
1863         }
1864         fprintf(f, "\"\n");
1865
1866         fprintf(f, "data(%u) = \"", (unsigned)data.dsize);
1867         for (i=sizeof(*h);i<data.dsize;i++) {
1868                 if (ISASCII(data.dptr[i])) {
1869                         fprintf(f, "%c", data.dptr[i]);
1870                 } else {
1871                         fprintf(f, "\\%02X", data.dptr[i]);
1872                 }
1873         }
1874         fprintf(f, "\"\n");
1875
1876         return 0;
1877 }
1878
1879 /*
1880   convenience function to list all keys to stdout
1881  */
1882 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1883 {
1884         return ctdb_traverse(ctdb_db, dumpdb_fn, f);
1885 }
1886
1887 /*
1888   get the pid of a ctdb daemon
1889  */
1890 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1891 {
1892         int ret;
1893         int32_t res;
1894
1895         ret = ctdb_control(ctdb, destnode, 0, 
1896                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1897                            NULL, NULL, &res, &timeout, NULL);
1898         if (ret != 0) {
1899                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1900                 return -1;
1901         }
1902
1903         *pid = res;
1904
1905         return 0;
1906 }
1907
1908
1909 /*
1910   async freeze send control
1911  */
1912 struct ctdb_client_control_state *
1913 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1914 {
1915         return ctdb_control_send(ctdb, destnode, 0, 
1916                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1917                            mem_ctx, &timeout, NULL);
1918 }
1919
1920 /* 
1921    async freeze recv control
1922 */
1923 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1924 {
1925         int ret;
1926         int32_t res;
1927
1928         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1929         if ( (ret != 0) || (res != 0) ){
1930                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1931                 return -1;
1932         }
1933
1934         return 0;
1935 }
1936
1937 /*
1938   freeze a node
1939  */
1940 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1941 {
1942         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1943         struct ctdb_client_control_state *state;
1944         int ret;
1945
1946         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode);
1947         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1948         talloc_free(tmp_ctx);
1949
1950         return ret;
1951 }
1952
1953 /*
1954   thaw a node
1955  */
1956 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1957 {
1958         int ret;
1959         int32_t res;
1960
1961         ret = ctdb_control(ctdb, destnode, 0, 
1962                            CTDB_CONTROL_THAW, 0, tdb_null, 
1963                            NULL, NULL, &res, &timeout, NULL);
1964         if (ret != 0 || res != 0) {
1965                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
1966                 return -1;
1967         }
1968
1969         return 0;
1970 }
1971
1972 /*
1973   get pnn of a node, or -1
1974  */
1975 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1976 {
1977         int ret;
1978         int32_t res;
1979
1980         ret = ctdb_control(ctdb, destnode, 0, 
1981                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
1982                            NULL, NULL, &res, &timeout, NULL);
1983         if (ret != 0) {
1984                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
1985                 return -1;
1986         }
1987
1988         return res;
1989 }
1990
1991 /*
1992   get the monitoring mode of a remote node
1993  */
1994 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
1995 {
1996         int ret;
1997         int32_t res;
1998
1999         ret = ctdb_control(ctdb, destnode, 0, 
2000                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2001                            NULL, NULL, &res, &timeout, NULL);
2002         if (ret != 0) {
2003                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2004                 return -1;
2005         }
2006
2007         *monmode = res;
2008
2009         return 0;
2010 }
2011
2012
2013 /*
2014  set the monitoring mode of a remote node to active
2015  */
2016 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2017 {
2018         int ret;
2019         
2020
2021         ret = ctdb_control(ctdb, destnode, 0, 
2022                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2023                            NULL, NULL,NULL, &timeout, NULL);
2024         if (ret != 0) {
2025                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2026                 return -1;
2027         }
2028
2029         
2030
2031         return 0;
2032 }
2033
2034 /*
2035   set the monitoring mode of a remote node to disable
2036  */
2037 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2038 {
2039         int ret;
2040         
2041
2042         ret = ctdb_control(ctdb, destnode, 0, 
2043                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2044                            NULL, NULL, NULL, &timeout, NULL);
2045         if (ret != 0) {
2046                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2047                 return -1;
2048         }
2049
2050         
2051
2052         return 0;
2053 }
2054
2055
2056
2057 /* 
2058   sent to a node to make it take over an ip address
2059 */
2060 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2061                           uint32_t destnode, struct ctdb_public_ip *ip)
2062 {
2063         TDB_DATA data;
2064         struct ctdb_public_ipv4 ipv4;
2065         int ret;
2066         int32_t res;
2067
2068         if (ip->addr.sa.sa_family == AF_INET) {
2069                 ipv4.pnn = ip->pnn;
2070                 ipv4.sin = ip->addr.ip;
2071
2072                 data.dsize = sizeof(ipv4);
2073                 data.dptr  = (uint8_t *)&ipv4;
2074
2075                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2076                            NULL, &res, &timeout, NULL);
2077         } else {
2078                 data.dsize = sizeof(*ip);
2079                 data.dptr  = (uint8_t *)ip;
2080
2081                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2082                            NULL, &res, &timeout, NULL);
2083         }
2084
2085         if (ret != 0 || res != 0) {
2086                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2087                 return -1;
2088         }
2089
2090         return 0;       
2091 }
2092
2093
2094 /* 
2095   sent to a node to make it release an ip address
2096 */
2097 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2098                          uint32_t destnode, struct ctdb_public_ip *ip)
2099 {
2100         TDB_DATA data;
2101         struct ctdb_public_ipv4 ipv4;
2102         int ret;
2103         int32_t res;
2104
2105         if (ip->addr.sa.sa_family == AF_INET) {
2106                 ipv4.pnn = ip->pnn;
2107                 ipv4.sin = ip->addr.ip;
2108
2109                 data.dsize = sizeof(ipv4);
2110                 data.dptr  = (uint8_t *)&ipv4;
2111
2112                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2113                                    NULL, &res, &timeout, NULL);
2114         } else {
2115                 data.dsize = sizeof(*ip);
2116                 data.dptr  = (uint8_t *)ip;
2117
2118                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2119                                    NULL, &res, &timeout, NULL);
2120         }
2121
2122         if (ret != 0 || res != 0) {
2123                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2124                 return -1;
2125         }
2126
2127         return 0;       
2128 }
2129
2130
2131 /*
2132   get a tunable
2133  */
2134 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2135                           struct timeval timeout, 
2136                           uint32_t destnode,
2137                           const char *name, uint32_t *value)
2138 {
2139         struct ctdb_control_get_tunable *t;
2140         TDB_DATA data, outdata;
2141         int32_t res;
2142         int ret;
2143
2144         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2145         data.dptr  = talloc_size(ctdb, data.dsize);
2146         CTDB_NO_MEMORY(ctdb, data.dptr);
2147
2148         t = (struct ctdb_control_get_tunable *)data.dptr;
2149         t->length = strlen(name)+1;
2150         memcpy(t->name, name, t->length);
2151
2152         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2153                            &outdata, &res, &timeout, NULL);
2154         talloc_free(data.dptr);
2155         if (ret != 0 || res != 0) {
2156                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2157                 return -1;
2158         }
2159
2160         if (outdata.dsize != sizeof(uint32_t)) {
2161                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2162                 talloc_free(outdata.dptr);
2163                 return -1;
2164         }
2165         
2166         *value = *(uint32_t *)outdata.dptr;
2167         talloc_free(outdata.dptr);
2168
2169         return 0;
2170 }
2171
2172 /*
2173   set a tunable
2174  */
2175 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2176                           struct timeval timeout, 
2177                           uint32_t destnode,
2178                           const char *name, uint32_t value)
2179 {
2180         struct ctdb_control_set_tunable *t;
2181         TDB_DATA data;
2182         int32_t res;
2183         int ret;
2184
2185         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2186         data.dptr  = talloc_size(ctdb, data.dsize);
2187         CTDB_NO_MEMORY(ctdb, data.dptr);
2188
2189         t = (struct ctdb_control_set_tunable *)data.dptr;
2190         t->length = strlen(name)+1;
2191         memcpy(t->name, name, t->length);
2192         t->value = value;
2193
2194         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2195                            NULL, &res, &timeout, NULL);
2196         talloc_free(data.dptr);
2197         if (ret != 0 || res != 0) {
2198                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2199                 return -1;
2200         }
2201
2202         return 0;
2203 }
2204
2205 /*
2206   list tunables
2207  */
2208 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2209                             struct timeval timeout, 
2210                             uint32_t destnode,
2211                             TALLOC_CTX *mem_ctx,
2212                             const char ***list, uint32_t *count)
2213 {
2214         TDB_DATA outdata;
2215         int32_t res;
2216         int ret;
2217         struct ctdb_control_list_tunable *t;
2218         char *p, *s, *ptr;
2219
2220         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2221                            mem_ctx, &outdata, &res, &timeout, NULL);
2222         if (ret != 0 || res != 0) {
2223                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2224                 return -1;
2225         }
2226
2227         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2228         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2229             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2230                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2231                 talloc_free(outdata.dptr);
2232                 return -1;              
2233         }
2234         
2235         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2236         CTDB_NO_MEMORY(ctdb, p);
2237
2238         talloc_free(outdata.dptr);
2239         
2240         (*list) = NULL;
2241         (*count) = 0;
2242
2243         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2244                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2245                 CTDB_NO_MEMORY(ctdb, *list);
2246                 (*list)[*count] = talloc_strdup(*list, s);
2247                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2248                 (*count)++;
2249         }
2250
2251         talloc_free(p);
2252
2253         return 0;
2254 }
2255
2256
2257 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb, 
2258                         struct timeval timeout, uint32_t destnode, 
2259                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2260 {
2261         int ret;
2262         TDB_DATA outdata;
2263         int32_t res;
2264
2265         ret = ctdb_control(ctdb, destnode, 0, 
2266                            CTDB_CONTROL_GET_PUBLIC_IPS, 0, tdb_null, 
2267                            mem_ctx, &outdata, &res, &timeout, NULL);
2268         if (ret == 0 && res == -1) {
2269                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2270                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2271         }
2272         if (ret != 0 || res != 0) {
2273           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2274                 return -1;
2275         }
2276
2277         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2278         talloc_free(outdata.dptr);
2279                     
2280         return 0;
2281 }
2282
2283 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2284                         struct timeval timeout, uint32_t destnode, 
2285                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2286 {
2287         int ret, i, len;
2288         TDB_DATA outdata;
2289         int32_t res;
2290         struct ctdb_all_public_ipsv4 *ipsv4;
2291
2292         ret = ctdb_control(ctdb, destnode, 0, 
2293                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2294                            mem_ctx, &outdata, &res, &timeout, NULL);
2295         if (ret != 0 || res != 0) {
2296                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2297                 return -1;
2298         }
2299
2300         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2301         len = offsetof(struct ctdb_all_public_ips, ips) +
2302                 ipsv4->num*sizeof(struct ctdb_public_ip);
2303         *ips = talloc_zero_size(mem_ctx, len);
2304         CTDB_NO_MEMORY(ctdb, *ips);
2305         (*ips)->num = ipsv4->num;
2306         for (i=0; i<ipsv4->num; i++) {
2307                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2308                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2309         }
2310
2311         talloc_free(outdata.dptr);
2312                     
2313         return 0;
2314 }
2315
2316 /*
2317   set/clear the permanent disabled bit on a remote node
2318  */
2319 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2320                        uint32_t set, uint32_t clear)
2321 {
2322         int ret;
2323         TDB_DATA data;
2324         struct ctdb_node_map *nodemap=NULL;
2325         struct ctdb_node_flag_change c;
2326         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2327         uint32_t recmaster;
2328         uint32_t *nodes;
2329
2330
2331         /* find the recovery master */
2332         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2333         if (ret != 0) {
2334                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2335                 talloc_free(tmp_ctx);
2336                 return ret;
2337         }
2338
2339
2340         /* read the node flags from the recmaster */
2341         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2342         if (ret != 0) {
2343                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2344                 talloc_free(tmp_ctx);
2345                 return -1;
2346         }
2347         if (destnode >= nodemap->num) {
2348                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2349                 talloc_free(tmp_ctx);
2350                 return -1;
2351         }
2352
2353         c.pnn       = destnode;
2354         c.old_flags = nodemap->nodes[destnode].flags;
2355         c.new_flags = c.old_flags;
2356         c.new_flags |= set;
2357         c.new_flags &= ~clear;
2358
2359         data.dsize = sizeof(c);
2360         data.dptr = (unsigned char *)&c;
2361
2362         /* send the flags update to all connected nodes */
2363         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2364
2365         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2366                                         nodes,
2367                                         timeout, false, data,
2368                                         NULL, NULL,
2369                                         NULL) != 0) {
2370                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to disable node failed\n"));
2371
2372                 talloc_free(tmp_ctx);
2373                 return -1;
2374         }
2375
2376         talloc_free(tmp_ctx);
2377         return 0;
2378 }
2379
2380
2381 /*
2382   get all tunables
2383  */
2384 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2385                                struct timeval timeout, 
2386                                uint32_t destnode,
2387                                struct ctdb_tunable *tunables)
2388 {
2389         TDB_DATA outdata;
2390         int ret;
2391         int32_t res;
2392
2393         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2394                            &outdata, &res, &timeout, NULL);
2395         if (ret != 0 || res != 0) {
2396                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2397                 return -1;
2398         }
2399
2400         if (outdata.dsize != sizeof(*tunables)) {
2401                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2402                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2403                 return -1;              
2404         }
2405
2406         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2407         talloc_free(outdata.dptr);
2408         return 0;
2409 }
2410
2411 /*
2412   add a public address to a node
2413  */
2414 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2415                       struct timeval timeout, 
2416                       uint32_t destnode,
2417                       struct ctdb_control_ip_iface *pub)
2418 {
2419         TDB_DATA data;
2420         int32_t res;
2421         int ret;
2422
2423         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2424         data.dptr  = (unsigned char *)pub;
2425
2426         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2427                            NULL, &res, &timeout, NULL);
2428         if (ret != 0 || res != 0) {
2429                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2430                 return -1;
2431         }
2432
2433         return 0;
2434 }
2435
2436 /*
2437   delete a public address from a node
2438  */
2439 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2440                       struct timeval timeout, 
2441                       uint32_t destnode,
2442                       struct ctdb_control_ip_iface *pub)
2443 {
2444         TDB_DATA data;
2445         int32_t res;
2446         int ret;
2447
2448         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2449         data.dptr  = (unsigned char *)pub;
2450
2451         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2452                            NULL, &res, &timeout, NULL);
2453         if (ret != 0 || res != 0) {
2454                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2455                 return -1;
2456         }
2457
2458         return 0;
2459 }
2460
2461 /*
2462   kill a tcp connection
2463  */
2464 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2465                       struct timeval timeout, 
2466                       uint32_t destnode,
2467                       struct ctdb_control_killtcp *killtcp)
2468 {
2469         TDB_DATA data;
2470         int32_t res;
2471         int ret;
2472
2473         data.dsize = sizeof(struct ctdb_control_killtcp);
2474         data.dptr  = (unsigned char *)killtcp;
2475
2476         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2477                            NULL, &res, &timeout, NULL);
2478         if (ret != 0 || res != 0) {
2479                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2480                 return -1;
2481         }
2482
2483         return 0;
2484 }
2485
2486 /*
2487   send a gratious arp
2488  */
2489 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2490                       struct timeval timeout, 
2491                       uint32_t destnode,
2492                       ctdb_sock_addr *addr,
2493                       const char *ifname)
2494 {
2495         TDB_DATA data;
2496         int32_t res;
2497         int ret, len;
2498         struct ctdb_control_gratious_arp *gratious_arp;
2499         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2500
2501
2502         len = strlen(ifname)+1;
2503         gratious_arp = talloc_size(tmp_ctx, 
2504                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2505         CTDB_NO_MEMORY(ctdb, gratious_arp);
2506
2507         gratious_arp->addr = *addr;
2508         gratious_arp->len = len;
2509         memcpy(&gratious_arp->iface[0], ifname, len);
2510
2511
2512         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2513         data.dptr  = (unsigned char *)gratious_arp;
2514
2515         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2516                            NULL, &res, &timeout, NULL);
2517         if (ret != 0 || res != 0) {
2518                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2519                 talloc_free(tmp_ctx);
2520                 return -1;
2521         }
2522
2523         talloc_free(tmp_ctx);
2524         return 0;
2525 }
2526
2527 /*
2528   get a list of all tcp tickles that a node knows about for a particular vnn
2529  */
2530 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2531                               struct timeval timeout, uint32_t destnode, 
2532                               TALLOC_CTX *mem_ctx, 
2533                               ctdb_sock_addr *addr,
2534                               struct ctdb_control_tcp_tickle_list **list)
2535 {
2536         int ret;
2537         TDB_DATA data, outdata;
2538         int32_t status;
2539
2540         data.dptr = (uint8_t*)addr;
2541         data.dsize = sizeof(ctdb_sock_addr);
2542
2543         ret = ctdb_control(ctdb, destnode, 0, 
2544                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2545                            mem_ctx, &outdata, &status, NULL, NULL);
2546         if (ret != 0 || status != 0) {
2547                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2548                 return -1;
2549         }
2550
2551         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2552
2553         return status;
2554 }
2555
2556 /*
2557   register a server id
2558  */
2559 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2560                       struct timeval timeout, 
2561                       struct ctdb_server_id *id)
2562 {
2563         TDB_DATA data;
2564         int32_t res;
2565         int ret;
2566
2567         data.dsize = sizeof(struct ctdb_server_id);
2568         data.dptr  = (unsigned char *)id;
2569
2570         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2571                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2572                         0, data, NULL,
2573                         NULL, &res, &timeout, NULL);
2574         if (ret != 0 || res != 0) {
2575                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2576                 return -1;
2577         }
2578
2579         return 0;
2580 }
2581
2582 /*
2583   unregister a server id
2584  */
2585 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2586                       struct timeval timeout, 
2587                       struct ctdb_server_id *id)
2588 {
2589         TDB_DATA data;
2590         int32_t res;
2591         int ret;
2592
2593         data.dsize = sizeof(struct ctdb_server_id);
2594         data.dptr  = (unsigned char *)id;
2595
2596         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2597                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2598                         0, data, NULL,
2599                         NULL, &res, &timeout, NULL);
2600         if (ret != 0 || res != 0) {
2601                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2602                 return -1;
2603         }
2604
2605         return 0;
2606 }
2607
2608
2609 /*
2610   check if a server id exists
2611
2612   if a server id does exist, return *status == 1, otherwise *status == 0
2613  */
2614 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2615                       struct timeval timeout, 
2616                       uint32_t destnode,
2617                       struct ctdb_server_id *id,
2618                       uint32_t *status)
2619 {
2620         TDB_DATA data;
2621         int32_t res;
2622         int ret;
2623
2624         data.dsize = sizeof(struct ctdb_server_id);
2625         data.dptr  = (unsigned char *)id;
2626
2627         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2628                         0, data, NULL,
2629                         NULL, &res, &timeout, NULL);
2630         if (ret != 0) {
2631                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2632                 return -1;
2633         }
2634
2635         if (res) {
2636                 *status = 1;
2637         } else {
2638                 *status = 0;
2639         }
2640
2641         return 0;
2642 }
2643
2644 /*
2645    get the list of server ids that are registered on a node
2646 */
2647 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2648                 TALLOC_CTX *mem_ctx,
2649                 struct timeval timeout, uint32_t destnode, 
2650                 struct ctdb_server_id_list **svid_list)
2651 {
2652         int ret;
2653         TDB_DATA outdata;
2654         int32_t res;
2655
2656         ret = ctdb_control(ctdb, destnode, 0, 
2657                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2658                            mem_ctx, &outdata, &res, &timeout, NULL);
2659         if (ret != 0 || res != 0) {
2660                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2661                 return -1;
2662         }
2663
2664         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2665                     
2666         return 0;
2667 }
2668
2669 /*
2670   initialise the ctdb daemon for client applications
2671
2672   NOTE: In current code the daemon does not fork. This is for testing purposes only
2673   and to simplify the code.
2674 */
2675 struct ctdb_context *ctdb_init(struct event_context *ev)
2676 {
2677         int ret;
2678         struct ctdb_context *ctdb;
2679
2680         ctdb = talloc_zero(ev, struct ctdb_context);
2681         if (ctdb == NULL) {
2682                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2683                 return NULL;
2684         }
2685         ctdb->ev  = ev;
2686         ctdb->idr = idr_init(ctdb);
2687         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2688
2689         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2690         if (ret != 0) {
2691                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2692                 talloc_free(ctdb);
2693                 return NULL;
2694         }
2695
2696         return ctdb;
2697 }
2698
2699
2700 /*
2701   set some ctdb flags
2702 */
2703 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2704 {
2705         ctdb->flags |= flags;
2706 }
2707
2708 /*
2709   setup the local socket name
2710 */
2711 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2712 {
2713         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2714         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2715
2716         return 0;
2717 }
2718
2719 /*
2720   return the pnn of this node
2721 */
2722 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2723 {
2724         return ctdb->pnn;
2725 }
2726
2727
2728 /*
2729   get the uptime of a remote node
2730  */
2731 struct ctdb_client_control_state *
2732 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2733 {
2734         return ctdb_control_send(ctdb, destnode, 0, 
2735                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2736                            mem_ctx, &timeout, NULL);
2737 }
2738
2739 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2740 {
2741         int ret;
2742         int32_t res;
2743         TDB_DATA outdata;
2744
2745         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2746         if (ret != 0 || res != 0) {
2747                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2748                 return -1;
2749         }
2750
2751         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2752
2753         return 0;
2754 }
2755
2756 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2757 {
2758         struct ctdb_client_control_state *state;
2759
2760         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2761         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2762 }
2763
2764 /*
2765   send a control to execute the "recovered" event script on a node
2766  */
2767 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2768 {
2769         int ret;
2770         int32_t status;
2771
2772         ret = ctdb_control(ctdb, destnode, 0, 
2773                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2774                            NULL, NULL, &status, &timeout, NULL);
2775         if (ret != 0 || status != 0) {
2776                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2777                 return -1;
2778         }
2779
2780         return 0;
2781 }
2782
2783 /* 
2784   callback for the async helpers used when sending the same control
2785   to multiple nodes in parallell.
2786 */
2787 static void async_callback(struct ctdb_client_control_state *state)
2788 {
2789         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2790         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2791         int ret;
2792         TDB_DATA outdata;
2793         int32_t res;
2794         uint32_t destnode = state->c->hdr.destnode;
2795
2796         /* one more node has responded with recmode data */
2797         data->count--;
2798
2799         /* if we failed to push the db, then return an error and let
2800            the main loop try again.
2801         */
2802         if (state->state != CTDB_CONTROL_DONE) {
2803                 if ( !data->dont_log_errors) {
2804                         DEBUG(DEBUG_ERR,("Async operation failed with state %d\n opcode:%u", state->state, data->opcode));
2805                 }
2806                 data->fail_count++;
2807                 if (data->fail_callback) {
2808                         data->fail_callback(ctdb, destnode, res, outdata,
2809                                         data->callback_data);
2810                 }
2811                 return;
2812         }
2813         
2814         state->async.fn = NULL;
2815
2816         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2817         if ((ret != 0) || (res != 0)) {
2818                 if ( !data->dont_log_errors) {
2819                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2820                 }
2821                 data->fail_count++;
2822                 if (data->fail_callback) {
2823                         data->fail_callback(ctdb, destnode, res, outdata,
2824                                         data->callback_data);
2825                 }
2826         }
2827         if ((ret == 0) && (data->callback != NULL)) {
2828                 data->callback(ctdb, destnode, res, outdata,
2829                                         data->callback_data);
2830         }
2831 }
2832
2833
2834 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2835 {
2836         /* set up the callback functions */
2837         state->async.fn = async_callback;
2838         state->async.private_data = data;
2839         
2840         /* one more control to wait for to complete */
2841         data->count++;
2842 }
2843
2844
2845 /* wait for up to the maximum number of seconds allowed
2846    or until all nodes we expect a response from has replied
2847 */
2848 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2849 {
2850         while (data->count > 0) {
2851                 event_loop_once(ctdb->ev);
2852         }
2853         if (data->fail_count != 0) {
2854                 if (!data->dont_log_errors) {
2855                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
2856                                  data->fail_count));
2857                 }
2858                 return -1;
2859         }
2860         return 0;
2861 }
2862
2863
2864 /* 
2865    perform a simple control on the listed nodes
2866    The control cannot return data
2867  */
2868 int ctdb_client_async_control(struct ctdb_context *ctdb,
2869                                 enum ctdb_controls opcode,
2870                                 uint32_t *nodes,
2871                                 struct timeval timeout,
2872                                 bool dont_log_errors,
2873                                 TDB_DATA data,
2874                                 client_async_callback client_callback,
2875                                 client_async_callback fail_callback,
2876                                 void *callback_data)
2877 {
2878         struct client_async_data *async_data;
2879         struct ctdb_client_control_state *state;
2880         int j, num_nodes;
2881
2882         async_data = talloc_zero(ctdb, struct client_async_data);
2883         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2884         async_data->dont_log_errors = dont_log_errors;
2885         async_data->callback = client_callback;
2886         async_data->fail_callback = fail_callback;
2887         async_data->callback_data = callback_data;
2888         async_data->opcode        = opcode;
2889
2890         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2891
2892         /* loop over all nodes and send an async control to each of them */
2893         for (j=0; j<num_nodes; j++) {
2894                 uint32_t pnn = nodes[j];
2895
2896                 state = ctdb_control_send(ctdb, pnn, 0, opcode, 
2897                                           0, data, async_data, &timeout, NULL);
2898                 if (state == NULL) {
2899                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2900                         talloc_free(async_data);
2901                         return -1;
2902                 }
2903                 
2904                 ctdb_client_async_add(async_data, state);
2905         }
2906
2907         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2908                 talloc_free(async_data);
2909                 return -1;
2910         }
2911
2912         talloc_free(async_data);
2913         return 0;
2914 }
2915
2916 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2917                                 struct ctdb_vnn_map *vnn_map,
2918                                 TALLOC_CTX *mem_ctx,
2919                                 bool include_self)
2920 {
2921         int i, j, num_nodes;
2922         uint32_t *nodes;
2923
2924         for (i=num_nodes=0;i<vnn_map->size;i++) {
2925                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2926                         continue;
2927                 }
2928                 num_nodes++;
2929         } 
2930
2931         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2932         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2933
2934         for (i=j=0;i<vnn_map->size;i++) {
2935                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2936                         continue;
2937                 }
2938                 nodes[j++] = vnn_map->map[i];
2939         } 
2940
2941         return nodes;
2942 }
2943
2944 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2945                                 struct ctdb_node_map *node_map,
2946                                 TALLOC_CTX *mem_ctx,
2947                                 bool include_self)
2948 {
2949         int i, j, num_nodes;
2950         uint32_t *nodes;
2951
2952         for (i=num_nodes=0;i<node_map->num;i++) {
2953                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2954                         continue;
2955                 }
2956                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2957                         continue;
2958                 }
2959                 num_nodes++;
2960         } 
2961
2962         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2963         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2964
2965         for (i=j=0;i<node_map->num;i++) {
2966                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2967                         continue;
2968                 }
2969                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2970                         continue;
2971                 }
2972                 nodes[j++] = node_map->nodes[i].pnn;
2973         } 
2974
2975         return nodes;
2976 }
2977
2978 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
2979                                 struct ctdb_node_map *node_map,
2980                                 TALLOC_CTX *mem_ctx,
2981                                 bool include_self)
2982 {
2983         int i, j, num_nodes;
2984         uint32_t *nodes;
2985
2986         for (i=num_nodes=0;i<node_map->num;i++) {
2987                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2988                         continue;
2989                 }
2990                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2991                         continue;
2992                 }
2993                 num_nodes++;
2994         } 
2995
2996         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2997         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2998
2999         for (i=j=0;i<node_map->num;i++) {
3000                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3001                         continue;
3002                 }
3003                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3004                         continue;
3005                 }
3006                 nodes[j++] = node_map->nodes[i].pnn;
3007         } 
3008
3009         return nodes;
3010 }
3011
3012 /* 
3013   this is used to test if a pnn lock exists and if it exists will return
3014   the number of connections that pnn has reported or -1 if that recovery
3015   daemon is not running.
3016 */
3017 int
3018 ctdb_read_pnn_lock(int fd, int32_t pnn)
3019 {
3020         struct flock lock;
3021         char c;
3022
3023         lock.l_type = F_WRLCK;
3024         lock.l_whence = SEEK_SET;
3025         lock.l_start = pnn;
3026         lock.l_len = 1;
3027         lock.l_pid = 0;
3028
3029         if (fcntl(fd, F_GETLK, &lock) != 0) {
3030                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3031                 return -1;
3032         }
3033
3034         if (lock.l_type == F_UNLCK) {
3035                 return -1;
3036         }
3037
3038         if (pread(fd, &c, 1, pnn) == -1) {
3039                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3040                 return -1;
3041         }
3042
3043         return c;
3044 }
3045
3046 /*
3047   get capabilities of a remote node
3048  */
3049 struct ctdb_client_control_state *
3050 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3051 {
3052         return ctdb_control_send(ctdb, destnode, 0, 
3053                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3054                            mem_ctx, &timeout, NULL);
3055 }
3056
3057 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3058 {
3059         int ret;
3060         int32_t res;
3061         TDB_DATA outdata;
3062
3063         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3064         if ( (ret != 0) || (res != 0) ) {
3065                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3066                 return -1;
3067         }
3068
3069         if (capabilities) {
3070                 *capabilities = *((uint32_t *)outdata.dptr);
3071         }
3072
3073         return 0;
3074 }
3075
3076 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3077 {
3078         struct ctdb_client_control_state *state;
3079         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3080         int ret;
3081
3082         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3083         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3084         talloc_free(tmp_ctx);
3085         return ret;
3086 }
3087
3088 struct ctdb_transaction_handle {
3089         struct ctdb_db_context *ctdb_db;
3090         bool in_replay;
3091         /* we store the reads and writes done under a transaction one
3092            list stores both reads and writes, the other just writes
3093         */
3094         struct ctdb_marshall_buffer *m_all;
3095         struct ctdb_marshall_buffer *m_write;
3096 };
3097
3098 /* start a transaction on a database */
3099 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3100 {
3101         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3102         return 0;
3103 }
3104
3105 /* start a transaction on a database */
3106 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3107 {
3108         struct ctdb_record_handle *rh;
3109         TDB_DATA key;
3110         struct ctdb_ltdb_header header;
3111         TALLOC_CTX *tmp_ctx;
3112         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3113         int ret;
3114         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3115
3116         key.dptr = discard_const(keyname);
3117         key.dsize = strlen(keyname);
3118
3119         if (!ctdb_db->persistent) {
3120                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3121                 return -1;
3122         }
3123
3124 again:
3125         tmp_ctx = talloc_new(h);
3126
3127         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3128         if (rh == NULL) {
3129                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));             
3130                 talloc_free(tmp_ctx);
3131                 return -1;
3132         }
3133         talloc_free(rh);
3134
3135         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3136         if (ret != 0) {
3137                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3138                 talloc_free(tmp_ctx);
3139                 return -1;
3140         }
3141
3142         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, NULL);
3143         if (ret != 0 || header.dmaster != ctdb_db->ctdb->pnn) {
3144                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3145                 talloc_free(tmp_ctx);
3146                 goto again;
3147         }
3148
3149         talloc_free(tmp_ctx);
3150
3151         return 0;
3152 }
3153
3154
3155 /* start a transaction on a database */
3156 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3157                                                        TALLOC_CTX *mem_ctx)
3158 {
3159         struct ctdb_transaction_handle *h;
3160         int ret;
3161
3162         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3163         if (h == NULL) {
3164                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3165                 return NULL;
3166         }
3167
3168         h->ctdb_db = ctdb_db;
3169
3170         ret = ctdb_transaction_fetch_start(h);
3171         if (ret != 0) {
3172                 talloc_free(h);
3173                 return NULL;
3174         }
3175
3176         talloc_set_destructor(h, ctdb_transaction_destructor);
3177
3178         return h;
3179 }
3180
3181
3182
3183 /*
3184   fetch a record inside a transaction
3185  */
3186 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3187                            TALLOC_CTX *mem_ctx, 
3188                            TDB_DATA key, TDB_DATA *data)
3189 {
3190         struct ctdb_ltdb_header header;
3191         int ret;
3192
3193         ZERO_STRUCT(header);
3194
3195         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3196         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3197                 /* record doesn't exist yet */
3198                 *data = tdb_null;
3199                 ret = 0;
3200         }
3201         
3202         if (ret != 0) {
3203                 return ret;
3204         }
3205
3206         if (!h->in_replay) {
3207                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3208                 if (h->m_all == NULL) {
3209                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3210                         return -1;
3211                 }
3212         }
3213
3214         return 0;
3215 }
3216
3217 /*
3218   stores a record inside a transaction
3219  */
3220 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3221                            TDB_DATA key, TDB_DATA data)
3222 {
3223         TALLOC_CTX *tmp_ctx = talloc_new(h);
3224         struct ctdb_ltdb_header header;
3225         TDB_DATA olddata;
3226         int ret;
3227
3228         ZERO_STRUCT(header);
3229
3230         /* we need the header so we can update the RSN */
3231         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3232         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3233                 /* the record doesn't exist - create one with us as dmaster.
3234                    This is only safe because we are in a transaction and this
3235                    is a persistent database */
3236                 ZERO_STRUCT(header);
3237         } else if (ret != 0) {
3238                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3239                 talloc_free(tmp_ctx);
3240                 return ret;
3241         }
3242
3243         if (data.dsize == olddata.dsize &&
3244             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3245                 /* save writing the same data */
3246                 talloc_free(tmp_ctx);
3247                 return 0;
3248         }
3249
3250         header.dmaster = h->ctdb_db->ctdb->pnn;
3251         header.rsn++;
3252
3253         if (!h->in_replay) {
3254                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3255                 if (h->m_all == NULL) {
3256                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3257                         talloc_free(tmp_ctx);
3258                         return -1;
3259                 }
3260         }               
3261
3262         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3263         if (h->m_write == NULL) {
3264                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3265                 talloc_free(tmp_ctx);
3266                 return -1;
3267         }
3268         
3269         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3270
3271         talloc_free(tmp_ctx);
3272         
3273         return ret;
3274 }
3275
3276 /*
3277   replay a transaction
3278  */
3279 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3280 {
3281         int ret, i;
3282         struct ctdb_rec_data *rec = NULL;
3283
3284         h->in_replay = true;
3285         talloc_free(h->m_write);
3286         h->m_write = NULL;
3287
3288         ret = ctdb_transaction_fetch_start(h);
3289         if (ret != 0) {
3290                 return ret;
3291         }
3292
3293         for (i=0;i<h->m_all->count;i++) {
3294                 TDB_DATA key, data;
3295
3296                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3297                 if (rec == NULL) {
3298                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3299                         goto failed;
3300                 }
3301
3302                 if (rec->reqid == 0) {
3303                         /* its a store */
3304                         if (ctdb_transaction_store(h, key, data) != 0) {
3305                                 goto failed;
3306                         }
3307                 } else {
3308                         TDB_DATA data2;
3309                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3310
3311                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3312                                 talloc_free(tmp_ctx);
3313                                 goto failed;
3314                         }
3315                         if (data2.dsize != data.dsize ||
3316                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3317                                 /* the record has changed on us - we have to give up */
3318                                 talloc_free(tmp_ctx);
3319                                 goto failed;
3320                         }
3321                         talloc_free(tmp_ctx);
3322                 }
3323         }
3324         
3325         return 0;
3326
3327 failed:
3328         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3329         return -1;
3330 }
3331
3332
3333 /*
3334   commit a transaction
3335  */
3336 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3337 {
3338         int ret, retries=0;
3339         int32_t status;
3340         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3341         struct timeval timeout;
3342         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3343
3344         talloc_set_destructor(h, NULL);
3345
3346         /* our commit strategy is quite complex.
3347
3348            - we first try to commit the changes to all other nodes
3349
3350            - if that works, then we commit locally and we are done
3351
3352            - if a commit on another node fails, then we need to cancel
3353              the transaction, then restart the transaction (thus
3354              opening a window of time for a pending recovery to
3355              complete), then replay the transaction, checking all the
3356              reads and writes (checking that reads give the same data,
3357              and writes succeed). Then we retry the transaction to the
3358              other nodes
3359         */
3360
3361 again:
3362         if (h->m_write == NULL) {
3363                 /* no changes were made */
3364                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3365                 talloc_free(h);
3366                 return 0;
3367         }
3368
3369         /* tell ctdbd to commit to the other nodes */
3370         timeout = timeval_current_ofs(1, 0);
3371         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3372                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3373                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3374                            &timeout, NULL);
3375         if (ret != 0 || status != 0) {
3376                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3377                 sleep(1);
3378
3379                 if (ret != 0) {
3380                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3381                 } else {
3382                         /* work out what error code we will give if we 
3383                            have to fail the operation */
3384                         switch ((enum ctdb_trans2_commit_error)status) {
3385                         case CTDB_TRANS2_COMMIT_SUCCESS:
3386                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3387                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3388                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3389                                 break;
3390                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3391                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3392                                 break;
3393                         }
3394                 }
3395
3396                 if (++retries == 10) {
3397                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3398                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3399                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3400                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3401                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3402                         talloc_free(h);
3403                         return -1;
3404                 }               
3405
3406                 if (ctdb_replay_transaction(h) != 0) {
3407                         DEBUG(DEBUG_ERR,(__location__ " Failed to replay transaction\n"));
3408                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3409                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3410                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3411                         talloc_free(h);
3412                         return -1;
3413                 }
3414                 goto again;
3415         } else {
3416                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3417         }
3418
3419         /* do the real commit locally */
3420         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3421         if (ret != 0) {
3422                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit transaction\n"));
3423                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3424                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3425                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3426                 talloc_free(h);
3427                 return ret;
3428         }
3429
3430         /* tell ctdbd that we are finished with our local commit */
3431         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3432                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3433                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3434         talloc_free(h);
3435         return 0;
3436 }
3437
3438 /*
3439   recovery daemon ping to main daemon
3440  */
3441 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3442 {
3443         int ret;
3444         int32_t res;
3445
3446         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3447                            ctdb, NULL, &res, NULL, NULL);
3448         if (ret != 0 || res != 0) {
3449                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3450                 return -1;
3451         }
3452
3453         return 0;
3454 }
3455
3456 /* when forking the main daemon and the child process needs to connect back
3457  * to the daemon as a client process, this function can be used to change
3458  * the ctdb context from daemon into client mode
3459  */
3460 int switch_from_server_to_client(struct ctdb_context *ctdb)
3461 {
3462         int ret;
3463
3464         /* shutdown the transport */
3465         if (ctdb->methods) {
3466                 ctdb->methods->shutdown(ctdb);
3467         }
3468
3469         /* get a new event context */
3470         talloc_free(ctdb->ev);
3471         ctdb->ev = event_context_init(ctdb);
3472
3473         close(ctdb->daemon.sd);
3474         ctdb->daemon.sd = -1;
3475
3476         /* the client does not need to be realtime */
3477         if (ctdb->do_setsched) {
3478                 ctdb_restore_scheduler(ctdb);
3479         }
3480
3481         /* initialise ctdb */
3482         ret = ctdb_socket_connect(ctdb);
3483         if (ret != 0) {
3484                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3485                 return -1;
3486         }
3487
3488          return 0;
3489 }
3490
3491 /*
3492   tell the main daemon we are starting a new monitor event script
3493  */
3494 int ctdb_ctrl_event_script_init(struct ctdb_context *ctdb)
3495 {
3496         int ret;
3497         int32_t res;
3498
3499         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_INIT, 0, tdb_null, 
3500                            ctdb, NULL, &res, NULL, NULL);
3501         if (ret != 0 || res != 0) {
3502                 DEBUG(DEBUG_ERR,("Failed to send event_script_init\n"));
3503                 return -1;
3504         }
3505
3506         return 0;
3507 }
3508
3509 /*
3510   tell the main daemon we are starting a new monitor event script
3511  */
3512 int ctdb_ctrl_event_script_finished(struct ctdb_context *ctdb)
3513 {
3514         int ret;
3515         int32_t res;
3516
3517         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_FINISHED, 0, tdb_null, 
3518                            ctdb, NULL, &res, NULL, NULL);
3519         if (ret != 0 || res != 0) {
3520                 DEBUG(DEBUG_ERR,("Failed to send event_script_init\n"));
3521                 return -1;
3522         }
3523
3524         return 0;
3525 }
3526
3527 /*
3528   tell the main daemon we are starting to run an eventscript
3529  */
3530 int ctdb_ctrl_event_script_start(struct ctdb_context *ctdb, const char *name)
3531 {
3532         int ret;
3533         int32_t res;
3534         TDB_DATA data;
3535
3536         data.dptr = discard_const(name);
3537         data.dsize = strlen(name)+1;
3538
3539         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_START, 0, data, 
3540                            ctdb, NULL, &res, NULL, NULL);
3541         if (ret != 0 || res != 0) {
3542                 DEBUG(DEBUG_ERR,("Failed to send event_script_start\n"));
3543                 return -1;
3544         }
3545
3546         return 0;
3547 }
3548
3549 /*
3550   tell the main daemon the status of the script we ran
3551  */
3552 int ctdb_ctrl_event_script_stop(struct ctdb_context *ctdb, int32_t result)
3553 {
3554         int ret;
3555         int32_t res;
3556         TDB_DATA data;
3557
3558         data.dptr = (uint8_t *)&result;
3559         data.dsize = sizeof(result);
3560
3561         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_EVENT_SCRIPT_STOP, 0, data, 
3562                            ctdb, NULL, &res, NULL, NULL);
3563         if (ret != 0 || res != 0) {
3564                 DEBUG(DEBUG_ERR,("Failed to send event_script_stop\n"));
3565                 return -1;
3566         }
3567
3568         return 0;
3569 }
3570
3571
3572 /*
3573   get the status of running the monitor eventscripts
3574  */
3575 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3576                 struct timeval timeout, uint32_t destnode, 
3577                 TALLOC_CTX *mem_ctx,
3578                 struct ctdb_monitoring_wire **script_status)
3579 {
3580         int ret;
3581         TDB_DATA outdata;
3582         int32_t res;
3583
3584         ret = ctdb_control(ctdb, destnode, 0, 
3585                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, tdb_null, 
3586                            mem_ctx, &outdata, &res, &timeout, NULL);
3587         if (ret != 0 || res != 0 || outdata.dsize == 0) {
3588                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3589                 return -1;
3590         }
3591
3592         *script_status = (struct ctdb_monitoring_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3593         talloc_free(outdata.dptr);
3594                     
3595         return 0;
3596 }
3597
3598 /*
3599   tell the main daemon how long it took to lock the reclock file
3600  */
3601 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3602 {
3603         int ret;
3604         int32_t res;
3605         TDB_DATA data;
3606
3607         data.dptr = (uint8_t *)&latency;
3608         data.dsize = sizeof(latency);
3609
3610         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3611                            ctdb, NULL, &res, NULL, NULL);
3612         if (ret != 0 || res != 0) {
3613                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3614                 return -1;
3615         }
3616
3617         return 0;
3618 }
3619
3620 /*
3621   get the name of the reclock file
3622  */
3623 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3624                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3625                          const char **name)
3626 {
3627         int ret;
3628         int32_t res;
3629         TDB_DATA data;
3630
3631         ret = ctdb_control(ctdb, destnode, 0, 
3632                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3633                            mem_ctx, &data, &res, &timeout, NULL);
3634         if (ret != 0 || res != 0) {
3635                 return -1;
3636         }
3637
3638         if (data.dsize == 0) {
3639                 *name = NULL;
3640         } else {
3641                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3642         }
3643         talloc_free(data.dptr);
3644
3645         return 0;
3646 }
3647
3648 /*
3649   set the reclock filename for a node
3650  */
3651 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3652 {
3653         int ret;
3654         TDB_DATA data;
3655         int32_t res;
3656
3657         if (reclock == NULL) {
3658                 data.dsize = 0;
3659                 data.dptr  = NULL;
3660         } else {
3661                 data.dsize = strlen(reclock) + 1;
3662                 data.dptr  = discard_const(reclock);
3663         }
3664
3665         ret = ctdb_control(ctdb, destnode, 0, 
3666                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3667                            NULL, NULL, &res, &timeout, NULL);
3668         if (ret != 0 || res != 0) {
3669                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3670                 return -1;
3671         }
3672
3673         return 0;
3674 }
3675
3676 /*
3677   stop a node
3678  */
3679 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3680 {
3681         int ret;
3682         int32_t res;
3683
3684         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
3685                            ctdb, NULL, &res, &timeout, NULL);
3686         if (ret != 0 || res != 0) {
3687                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
3688                 return -1;
3689         }
3690
3691         return 0;
3692 }
3693
3694 /*
3695   continue a node
3696  */
3697 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3698 {
3699         int ret;
3700
3701         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
3702                            ctdb, NULL, NULL, &timeout, NULL);
3703         if (ret != 0) {
3704                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
3705                 return -1;
3706         }
3707
3708         return 0;
3709 }
3710
3711 /*
3712   set the natgw state for a node
3713  */
3714 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
3715 {
3716         int ret;
3717         TDB_DATA data;
3718         int32_t res;
3719
3720         data.dsize = sizeof(natgwstate);
3721         data.dptr  = (uint8_t *)&natgwstate;
3722
3723         ret = ctdb_control(ctdb, destnode, 0, 
3724                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
3725                            NULL, NULL, &res, &timeout, NULL);
3726         if (ret != 0 || res != 0) {
3727                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
3728                 return -1;
3729         }
3730
3731         return 0;
3732 }
3733
3734 /*
3735   set the lmaster role for a node
3736  */
3737 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
3738 {
3739         int ret;
3740         TDB_DATA data;
3741         int32_t res;
3742
3743         data.dsize = sizeof(lmasterrole);
3744         data.dptr  = (uint8_t *)&lmasterrole;
3745
3746         ret = ctdb_control(ctdb, destnode, 0, 
3747                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
3748                            NULL, NULL, &res, &timeout, NULL);
3749         if (ret != 0 || res != 0) {
3750                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
3751                 return -1;
3752         }
3753
3754         return 0;
3755 }
3756
3757 /*
3758   set the recmaster role for a node
3759  */
3760 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
3761 {
3762         int ret;
3763         TDB_DATA data;
3764         int32_t res;
3765
3766         data.dsize = sizeof(recmasterrole);
3767         data.dptr  = (uint8_t *)&recmasterrole;
3768
3769         ret = ctdb_control(ctdb, destnode, 0, 
3770                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
3771                            NULL, NULL, &res, &timeout, NULL);
3772         if (ret != 0 || res != 0) {
3773                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
3774                 return -1;
3775         }
3776
3777         return 0;
3778 }