merge from tridge
[vlendec/samba-autobuild/.git] / ctdb / common / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This library is free software; you can redistribute it and/or
8    modify it under the terms of the GNU Lesser General Public
9    License as published by the Free Software Foundation; either
10    version 2 of the License, or (at your option) any later version.
11
12    This library is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15    Lesser General Public License for more details.
16
17    You should have received a copy of the GNU Lesser General Public
18    License along with this library; if not, write to the Free Software
19    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20 */
21
22 #include "includes.h"
23 #include "db_wrap.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/events/events.h"
26 #include "lib/util/dlinklist.h"
27 #include "system/network.h"
28 #include "system/filesys.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31
32 /*
33   queue a packet for sending from client to daemon
34 */
35 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
36 {
37         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
38 }
39
40
41 /*
42   handle a connect wait reply packet
43  */
44 static void ctdb_reply_connect_wait(struct ctdb_context *ctdb, 
45                                     struct ctdb_req_header *hdr)
46 {
47         struct ctdb_reply_connect_wait *r = (struct ctdb_reply_connect_wait *)hdr;
48         ctdb->num_connected = r->num_connected;
49 }
50
51 enum fetch_lock_state { CTDB_FETCH_LOCK_WAIT, CTDB_FETCH_LOCK_DONE, CTDB_FETCH_LOCK_ERROR };
52
53 /*
54   state of a in-progress ctdb call
55 */
56 struct ctdb_fetch_lock_state {
57         enum fetch_lock_state state;
58         struct ctdb_db_context *ctdb_db;
59         struct ctdb_reply_fetch_lock *r;
60         struct ctdb_req_fetch_lock *req;
61         struct ctdb_ltdb_header header;
62 };
63
64
65
66 /*
67   called in the client when we receive a CTDB_REPLY_FETCH_LOCK from the daemon
68
69   This packet comes in response to a CTDB_REQ_FETCH_LOCK request packet. It
70   contains any reply data from the call
71 */
72 void ctdb_reply_fetch_lock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
73 {
74         struct ctdb_reply_fetch_lock *r = (struct ctdb_reply_fetch_lock *)hdr;
75         struct ctdb_fetch_lock_state *state;
76
77         state = idr_find(ctdb->idr, hdr->reqid);
78         if (state == NULL) {
79                 DEBUG(0, ("reqid %d not found at %s\n", hdr->reqid,
80                           __location__));
81                 return;
82         }
83
84         if (!talloc_get_type(state, struct ctdb_fetch_lock_state)) {
85                 DEBUG(0, ("ctdb idr type error at %s, it's a %s\n",
86                           __location__, talloc_get_name(state)));
87                 return;
88         }
89
90         state->r = talloc_steal(state, r);
91
92         state->state = CTDB_FETCH_LOCK_DONE;
93 }
94
95 /*
96   this is called in the client, when data comes in from the daemon
97  */
98 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
99 {
100         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
101         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
102         TALLOC_CTX *tmp_ctx;
103
104         /* place the packet as a child of a tmp_ctx. We then use
105            talloc_free() below to free it. If any of the calls want
106            to keep it, then they will steal it somewhere else, and the
107            talloc_free() will be a no-op */
108         tmp_ctx = talloc_new(ctdb);
109         talloc_steal(tmp_ctx, hdr);
110
111         if (cnt == 0) {
112                 DEBUG(2,("Daemon has exited - shutting down client\n"));
113                 exit(0);
114         }
115
116         if (cnt < sizeof(*hdr)) {
117                 DEBUG(0,("Bad packet length %d in client\n", cnt));
118                 goto done;
119         }
120         if (cnt != hdr->length) {
121                 ctdb_set_error(ctdb, "Bad header length %d expected %d in client\n", 
122                                hdr->length, cnt);
123                 goto done;
124         }
125
126         if (hdr->ctdb_magic != CTDB_MAGIC) {
127                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
128                 goto done;
129         }
130
131         if (hdr->ctdb_version != CTDB_VERSION) {
132                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
133                 goto done;
134         }
135
136         switch (hdr->operation) {
137         case CTDB_REPLY_CALL:
138                 ctdb_reply_call(ctdb, hdr);
139                 break;
140
141         case CTDB_REQ_MESSAGE:
142                 ctdb_request_message(ctdb, hdr);
143                 break;
144
145         case CTDB_REPLY_CONNECT_WAIT:
146                 ctdb_reply_connect_wait(ctdb, hdr);
147                 break;
148
149         case CTDB_REPLY_FETCH_LOCK:
150                 ctdb_reply_fetch_lock(ctdb, hdr);
151                 break;
152
153         default:
154                 DEBUG(0,("bogus operation code:%d\n",hdr->operation));
155         }
156
157 done:
158         talloc_free(tmp_ctx);
159 }
160
161 /*
162   connect to a unix domain socket
163 */
164 static int ux_socket_connect(struct ctdb_context *ctdb)
165 {
166         struct sockaddr_un addr;
167
168         memset(&addr, 0, sizeof(addr));
169         addr.sun_family = AF_UNIX;
170         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
171
172         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
173         if (ctdb->daemon.sd == -1) {
174                 return -1;
175         }
176         
177         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
178                 close(ctdb->daemon.sd);
179                 ctdb->daemon.sd = -1;
180                 return -1;
181         }
182
183         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
184                                               CTDB_DS_ALIGNMENT, 
185                                               ctdb_client_read_cb, ctdb);
186         return 0;
187 }
188
189
190 struct ctdb_record_handle {
191         struct ctdb_db_context *ctdb_db;
192         TDB_DATA key;
193         TDB_DATA *data;
194         struct ctdb_ltdb_header header;
195 };
196
197 /*
198   make a recv call to the local ctdb daemon - called from client context
199
200   This is called when the program wants to wait for a ctdb_call to complete and get the 
201   results. This call will block unless the call has already completed.
202 */
203 int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
204 {
205         struct ctdb_record_handle *rec;
206
207         while (state->state < CTDB_CALL_DONE) {
208                 event_loop_once(state->node->ctdb->ev);
209         }
210         if (state->state != CTDB_CALL_DONE) {
211                 ctdb_set_error(state->node->ctdb, "%s", state->errmsg);
212                 talloc_free(state);
213                 return -1;
214         }
215
216         rec = state->fetch_private;
217
218         /* ugly hack to manage forced migration */
219         if (rec != NULL) {
220                 rec->data->dptr = talloc_steal(rec, state->call.reply_data.dptr);
221                 rec->data->dsize = state->call.reply_data.dsize;
222                 talloc_free(state);
223                 return 0;
224         }
225
226         if (state->call.reply_data.dsize) {
227                 call->reply_data.dptr = talloc_memdup(state->node->ctdb,
228                                                       state->call.reply_data.dptr,
229                                                       state->call.reply_data.dsize);
230                 call->reply_data.dsize = state->call.reply_data.dsize;
231         } else {
232                 call->reply_data.dptr = NULL;
233                 call->reply_data.dsize = 0;
234         }
235         call->status = state->call.status;
236         talloc_free(state);
237
238         return 0;
239 }
240
241
242
243
244 /*
245   destroy a ctdb_call in client
246 */
247 static int ctdb_client_call_destructor(struct ctdb_call_state *state)   
248 {
249         idr_remove(state->node->ctdb->idr, state->c->hdr.reqid);
250         return 0;
251 }
252
253
254
255 /*
256   make a ctdb call to the local daemon - async send. Called from client context.
257
258   This constructs a ctdb_call request and queues it for processing. 
259   This call never blocks.
260 */
261 struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
262                                        struct ctdb_call *call)
263 {
264         struct ctdb_call_state *state;
265         struct ctdb_context *ctdb = ctdb_db->ctdb;
266         struct ctdb_ltdb_header header;
267         TDB_DATA data;
268         int ret;
269         size_t len;
270
271         /* if the domain socket is not yet open, open it */
272         if (ctdb->daemon.sd==-1) {
273                 ux_socket_connect(ctdb);
274         }
275
276         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
277         if (ret != 0) {
278                 return NULL;
279         }
280
281 #if 0
282         if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
283                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
284                 talloc_free(data.dptr);
285                 return state;
286         }
287 #endif
288
289         state = talloc_zero(ctdb_db, struct ctdb_call_state);
290         if (state == NULL) {
291                 DEBUG(0, (__location__ " failed to allocate state\n"));
292                 return NULL;
293         }
294
295         talloc_steal(state, data.dptr);
296
297         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
298         state->c = ctdbd_allocate_pkt(ctdb, len);
299         if (state->c == NULL) {
300                 DEBUG(0, (__location__ " failed to allocate packet\n"));
301                 return NULL;
302         }
303         talloc_set_name_const(state->c, "ctdbd req_call packet");
304         talloc_steal(state, state->c);
305
306         state->c->hdr.length    = len;
307         state->c->hdr.ctdb_magic = CTDB_MAGIC;
308         state->c->hdr.ctdb_version = CTDB_VERSION;
309         state->c->hdr.operation = CTDB_REQ_CALL;
310         state->c->hdr.destnode  = header.dmaster;
311         state->c->hdr.srcnode   = ctdb->vnn;
312         /* this limits us to 16k outstanding messages - not unreasonable */
313         state->c->hdr.reqid     = idr_get_new(ctdb->idr, state, 0xFFFF);
314         state->c->flags         = call->flags;
315         state->c->db_id         = ctdb_db->db_id;
316         state->c->callid        = call->call_id;
317         state->c->keylen        = call->key.dsize;
318         state->c->calldatalen   = call->call_data.dsize;
319         memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
320         memcpy(&state->c->data[call->key.dsize], 
321                call->call_data.dptr, call->call_data.dsize);
322         state->call                = *call;
323         state->call.call_data.dptr = &state->c->data[call->key.dsize];
324         state->call.key.dptr       = &state->c->data[0];
325
326         state->node   = ctdb->nodes[header.dmaster];
327         state->state  = CTDB_CALL_WAIT;
328         state->header = header;
329         state->ctdb_db = ctdb_db;
330
331         talloc_set_destructor(state, ctdb_client_call_destructor);
332
333         ctdb_client_queue_pkt(ctdb, &state->c->hdr);
334
335 /*XXX set up timeout to cleanup if server doesnt respond
336         event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), 
337                         ctdb_call_timeout, state);
338 */
339
340         return state;
341 }
342
343
344 /*
345   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
346 */
347 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
348 {
349         struct ctdb_call_state *state;
350
351         state = ctdb_call_send(ctdb_db, call);
352         return ctdb_call_recv(state, call);
353 }
354
355
356 /*
357   tell the daemon what messaging srvid we will use, and register the message
358   handler function in the client
359 */
360 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
361                              ctdb_message_fn_t handler,
362                              void *private_data)
363                                     
364 {
365         struct ctdb_req_register c;
366         int res;
367
368         /* if the domain socket is not yet open, open it */
369         if (ctdb->daemon.sd==-1) {
370                 ux_socket_connect(ctdb);
371         }
372
373         ZERO_STRUCT(c);
374
375         c.hdr.length       = sizeof(c);
376         c.hdr.ctdb_magic   = CTDB_MAGIC;
377         c.hdr.ctdb_version = CTDB_VERSION;
378         c.hdr.operation    = CTDB_REQ_REGISTER;
379         c.srvid            = srvid;
380
381         res = ctdb_client_queue_pkt(ctdb, &c.hdr);
382         if (res != 0) {
383                 return res;
384         }
385
386         /* also need to register the handler with our ctdb structure */
387         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
388 }
389
390
391 /*
392   send a message - from client context
393  */
394 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
395                       uint32_t srvid, TDB_DATA data)
396 {
397         struct ctdb_req_message *r;
398         int len, res;
399
400         len = offsetof(struct ctdb_req_message, data) + data.dsize;
401         r = ctdb->methods->allocate_pkt(ctdb, len);
402         CTDB_NO_MEMORY(ctdb, r);
403         talloc_set_name_const(r, "req_message packet");
404
405         r->hdr.length    = len;
406         r->hdr.ctdb_magic = CTDB_MAGIC;
407         r->hdr.ctdb_version = CTDB_VERSION;
408         r->hdr.operation = CTDB_REQ_MESSAGE;
409         r->hdr.destnode  = vnn;
410         r->hdr.srcnode   = ctdb->vnn;
411         r->hdr.reqid     = 0;
412         r->srvid         = srvid;
413         r->datalen       = data.dsize;
414         memcpy(&r->data[0], data.dptr, data.dsize);
415         
416         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
417         if (res != 0) {
418                 return res;
419         }
420
421         talloc_free(r);
422         return 0;
423 }
424
425 /*
426   wait for all nodes to be connected - from client
427  */
428 void ctdb_connect_wait(struct ctdb_context *ctdb)
429 {
430         struct ctdb_req_connect_wait r;
431         int res;
432
433         ZERO_STRUCT(r);
434
435         r.hdr.length     = sizeof(r);
436         r.hdr.ctdb_magic = CTDB_MAGIC;
437         r.hdr.ctdb_version = CTDB_VERSION;
438         r.hdr.operation = CTDB_REQ_CONNECT_WAIT;
439
440         DEBUG(3,("ctdb_connect_wait: sending to ctdbd\n"));
441
442         /* if the domain socket is not yet open, open it */
443         if (ctdb->daemon.sd==-1) {
444                 ux_socket_connect(ctdb);
445         }
446         
447         res = ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)&r.hdr, r.hdr.length);
448         if (res != 0) {
449                 DEBUG(0,(__location__ " Failed to queue a connect wait request\n"));
450                 return;
451         }
452
453         DEBUG(3,("ctdb_connect_wait: waiting\n"));
454
455         /* now we can go into the normal wait routine, as the reply packet
456            will update the ctdb->num_connected variable */
457         ctdb_daemon_connect_wait(ctdb);
458 }
459
460 static int ctdb_fetch_lock_destructor(struct ctdb_fetch_lock_state *state)
461 {
462         idr_remove(state->ctdb_db->ctdb->idr, state->req->hdr.reqid);
463         return 0;
464 }
465
466 static struct ctdb_fetch_lock_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb_db, 
467                                                                  TALLOC_CTX *mem_ctx, 
468                                                                  TDB_DATA key, 
469                                                                  struct ctdb_ltdb_header *header)
470 {
471         struct ctdb_fetch_lock_state *state;
472         struct ctdb_context *ctdb = ctdb_db->ctdb;
473         struct ctdb_req_fetch_lock *req;
474         int len, res;
475
476         /* if the domain socket is not yet open, open it */
477         if (ctdb->daemon.sd==-1) {
478                 ux_socket_connect(ctdb);
479         }
480
481         state = talloc_zero(ctdb_db, struct ctdb_fetch_lock_state);
482         if (state == NULL) {
483                 DEBUG(0, (__location__ " failed to allocate state\n"));
484                 return NULL;
485         }
486         state->state   = CTDB_FETCH_LOCK_WAIT;
487         state->ctdb_db = ctdb_db;
488         len = offsetof(struct ctdb_req_fetch_lock, key) + key.dsize;
489         state->req = req = ctdbd_allocate_pkt(ctdb, len);
490         if (req == NULL) {
491                 DEBUG(0, (__location__ " failed to allocate packet\n"));
492                 return NULL;
493         }
494         ZERO_STRUCT(*req);
495         talloc_set_name_const(req, "ctdbd req_fetch_lock packet");
496         talloc_steal(state, req);
497
498         req->hdr.length      = len;
499         req->hdr.ctdb_magic  = CTDB_MAGIC;
500         req->hdr.ctdb_version = CTDB_VERSION;
501         req->hdr.operation   = CTDB_REQ_FETCH_LOCK;
502         req->hdr.reqid       = idr_get_new(ctdb->idr, state, 0xFFFF);
503         req->db_id           = ctdb_db->db_id;
504         req->keylen          = key.dsize;
505         req->header          = *header;
506         memcpy(&req->key[0], key.dptr, key.dsize);
507
508         talloc_set_destructor(state, ctdb_fetch_lock_destructor);
509         
510         res = ctdb_client_queue_pkt(ctdb, &req->hdr);
511         if (res != 0) {
512                 return NULL;
513         }
514
515         return state;
516 }
517
518
519 /*
520   make a recv call to the local ctdb daemon - called from client context
521
522   This is called when the program wants to wait for a ctdb_fetch_lock to complete and get the 
523   results. This call will block unless the call has already completed.
524 */
525 int ctdb_client_fetch_lock_recv(struct ctdb_fetch_lock_state *state, TALLOC_CTX *mem_ctx, 
526                                 TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA *data)
527 {
528         while (state->state < CTDB_FETCH_LOCK_DONE) {
529                 event_loop_once(state->ctdb_db->ctdb->ev);
530         }
531         if (state->state != CTDB_FETCH_LOCK_DONE) {
532                 talloc_free(state);
533                 return -1;
534         }
535
536         *header = state->r->header;
537         data->dsize = state->r->datalen;
538         data->dptr  = talloc_memdup(mem_ctx, state->r->data, data->dsize);
539
540         talloc_free(state);
541
542         return 0;
543 }
544
545 /*
546   cancel a ctdb_fetch_lock operation, releasing the lock
547  */
548 static int fetch_lock_destructor(struct ctdb_record_handle *h)
549 {
550         ctdb_ltdb_unlock(h->ctdb_db, h->key);
551         return 0;
552 }
553
554 /*
555   get a lock on a record, and return the records data. Blocks until it gets the lock
556  */
557 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
558                                            TDB_DATA key, TDB_DATA *data)
559 {
560         int ret;
561         struct ctdb_record_handle *h;
562         struct ctdb_fetch_lock_state *state;
563
564         /*
565           procedure is as follows:
566
567           1) get the chain lock. 
568           2) check if we are dmaster
569           3) if we are the dmaster then return handle 
570           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
571              reply from ctdbd
572           5) when we get the reply, we are now dmaster, update vnn in header
573           6) return handle
574          */
575
576         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
577         if (h == NULL) {
578                 return NULL;
579         }
580
581         h->ctdb_db = ctdb_db;
582         h->key     = key;
583         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
584         if (h->key.dptr == NULL) {
585                 talloc_free(h);
586                 return NULL;
587         }
588         h->data    = data;
589
590         DEBUG(3,("ctdb_fetch_lock: key=%*.*s\n", key.dsize, key.dsize, 
591                  (const char *)key.dptr));
592
593         /* step 1 - get the chain lock */
594         ret = ctdb_ltdb_lock(ctdb_db, key);
595         if (ret != 0) {
596                 DEBUG(0, (__location__ " failed to lock ltdb record\n"));
597                 talloc_free(h);
598                 return NULL;
599         }
600
601         DEBUG(4,("ctdb_fetch_lock: got chain lock\n"));
602
603         talloc_set_destructor(h, fetch_lock_destructor);
604
605         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
606         if (ret != 0) {
607                 talloc_free(h);
608                 return NULL;
609         }
610
611         DEBUG(4,("ctdb_fetch_lock: done local fetch\n"));
612
613         /* step 2 - check if we are the dmaster */
614         if (h->header.dmaster == ctdb_db->ctdb->vnn) {
615                 DEBUG(4,("ctdb_fetch_lock: we are dmaster - done\n"));
616                 return h;
617         }
618
619         /* we're not the dmaster - ask the ctdb daemon to make us dmaster */
620         state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key, &h->header);
621         DEBUG(4,("ctdb_fetch_lock: done fetch_lock_send\n"));
622         ret = ctdb_client_fetch_lock_recv(state, mem_ctx, key, &h->header, data);
623         if (ret != 0) {
624                 DEBUG(4,("ctdb_fetch_lock: fetch_lock_recv failed\n"));
625                 talloc_free(h);
626                 return NULL;
627         }
628
629         DEBUG(4,("ctdb_fetch_lock: record is now local\n"));
630
631         /* the record is now local, and locked. update the record on disk
632            to mark us as the dmaster*/
633         h->header.dmaster = ctdb_db->ctdb->vnn;
634         ret = ctdb_ltdb_store(ctdb_db, key, &h->header, *data);
635         if (ret != 0) {
636                 DEBUG(0, (__location__" can't update record to mark us as dmaster\n"));
637                 talloc_free(h);
638                 return NULL;
639         }
640
641         DEBUG(4,("ctdb_fetch_lock: done\n"));
642
643         /* give the caller a handle to be used for ctdb_record_store() or a cancel via
644            a talloc_free() */
645         return h;
646 }
647
648 /*
649   store some data to the record that was locked with ctdb_fetch_lock()
650 */
651 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
652 {
653         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
654 }
655
656 /*
657   wait until we're the only node left.
658   this function never returns
659 */
660 void ctdb_shutdown(struct ctdb_context *ctdb)
661 {
662         struct ctdb_req_shutdown r;
663         int len;
664
665         /* if the domain socket is not yet open, open it */
666         if (ctdb->daemon.sd==-1) {
667                 ux_socket_connect(ctdb);
668         }
669
670         len = sizeof(struct ctdb_req_shutdown);
671         ZERO_STRUCT(r);
672         r.hdr.length       = len;
673         r.hdr.ctdb_magic   = CTDB_MAGIC;
674         r.hdr.ctdb_version = CTDB_VERSION;
675         r.hdr.operation    = CTDB_REQ_SHUTDOWN;
676         r.hdr.reqid        = 0;
677
678         ctdb_client_queue_pkt(ctdb, &(r.hdr));
679
680         /* this event loop will terminate once we receive the reply */
681         while (1) {
682                 event_loop_once(ctdb->ev);
683         }
684 }
685
686