merged tridge's branch
[samba.git] / ctdb / common / ctdb_daemon.c
index d784b059b85217411fe488853f9987b978e28179..0ec201d29fb9bf848e22d22146f71b62b069e7f0 100644 (file)
 #include "../include/ctdb.h"
 #include "../include/ctdb_private.h"
 
+/*
+  structure describing a connected client in the daemon
+ */
+struct ctdb_client {
+       struct ctdb_context *ctdb;
+       int fd;
+       struct ctdb_queue *queue;
+};
+
+
+
+static void daemon_incoming_packet(void *, uint8_t *, uint32_t );
+
 static void ctdb_main_loop(struct ctdb_context *ctdb)
 {
+       /* we are the dispatcher process now, so start the protocol going */
+       if (ctdb_init_transport(ctdb)) {
+               exit(1);
+       }
+
        ctdb->methods->start(ctdb);
 
        /* go into a wait loop to allow other nodes to complete */
@@ -62,14 +80,13 @@ static void block_signal(int signum)
 
 
 /*
-  structure describing a connected client in the daemon
+  send a packet to a client
  */
-struct ctdb_client {
-       struct ctdb_context *ctdb;
-       int fd;
-       struct ctdb_queue *queue;
-};
-
+static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
+{
+       client->ctdb->status.client_packets_sent++;
+       return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
+}
 
 /*
   message handler for when we are in daemon mode. This redirects the message
@@ -86,7 +103,6 @@ static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid,
        len = offsetof(struct ctdb_req_message, data) + data.dsize;
        r = ctdbd_allocate_pkt(ctdb, len);
 
-/*XXX cant use this since it returns an int    CTDB_NO_MEMORY(ctdb, r);*/
        talloc_set_name_const(r, "req_message packet");
 
        memset(r, 0, offsetof(struct ctdb_req_message, data));
@@ -99,10 +115,9 @@ static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid,
        r->datalen       = data.dsize;
        memcpy(&r->data[0], data.dptr, data.dsize);
 
-       ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, len);
+       daemon_queue_send(client, &r->hdr);
 
        talloc_free(r);
-       return;
 }
                                           
 
@@ -120,77 +135,13 @@ static void daemon_request_register_message_handler(struct ctdb_client *client,
        if (res != 0) {
                DEBUG(0,(__location__ " Failed to register handler %u in daemon\n", 
                         c->srvid));
+       } else {
+               DEBUG(2,(__location__ " Registered message handler for srvid=%u\n", 
+                        c->srvid));
        }
 }
 
 
-static struct ctdb_call_state *ctdb_daemon_fetch_lock_send(struct ctdb_db_context *ctdb_db, 
-                                                          TALLOC_CTX *mem_ctx, 
-                                                          TDB_DATA key, struct ctdb_ltdb_header *header,
-                                                          TDB_DATA *data)
-{
-       struct ctdb_call *call;
-       struct ctdb_fetch_handle *rec;
-       struct ctdb_call_state *state;
-
-       rec = talloc(mem_ctx, struct ctdb_fetch_handle);
-       CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec);
-
-       
-       call = talloc(rec, struct ctdb_call);
-       ZERO_STRUCT(*call);
-       call->call_id = CTDB_FETCH_FUNC;
-       call->key = key;
-       call->flags = CTDB_IMMEDIATE_MIGRATION;
-
-       rec->ctdb_db = ctdb_db;
-       rec->key = key;
-       rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize);
-       rec->data = data;
-
-       state = ctdb_daemon_call_send_remote(ctdb_db, call, header);
-       state->fetch_private = rec;
-       talloc_steal(state, rec);
-
-       return state;
-}
-
-struct client_fetch_lock_data {
-       struct ctdb_client *client;
-       uint32_t reqid;
-};
-
-static void daemon_fetch_lock_complete(struct ctdb_call_state *state)
-{
-       struct ctdb_reply_fetch_lock *r;
-       struct client_fetch_lock_data *data = talloc_get_type(state->async.private_data, struct client_fetch_lock_data);
-       struct ctdb_client *client = talloc_get_type(data->client, struct ctdb_client);
-       int length, res;
-
-       length = offsetof(struct ctdb_reply_fetch_lock, data) + state->call.reply_data.dsize;
-       r = ctdbd_allocate_pkt(client->ctdb, length);
-       if (r == NULL) {
-               DEBUG(0,(__location__ " Failed to allocate reply_call in ctdb daemon\n"));
-               return;
-       }
-       memset(r, 0, offsetof(struct ctdb_reply_fetch_lock, data));
-       r->hdr.length       = length;
-       r->hdr.ctdb_magic   = CTDB_MAGIC;
-       r->hdr.ctdb_version = CTDB_VERSION;
-       r->hdr.operation    = CTDB_REPLY_FETCH_LOCK;
-       r->hdr.reqid        = data->reqid;
-       r->state            = state->state;
-       r->datalen          = state->call.reply_data.dsize;
-       memcpy(&r->data[0], state->call.reply_data.dptr, r->datalen);
-
-       res = ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, r->hdr.length);
-       if (res != 0) {
-               DEBUG(0,(__location__ " Failed to queue packet from daemon to client\n"));
-       }
-       talloc_free(r);
-       talloc_free(state);
-}
-
 /*
   called when the daemon gets a shutdown request from a client
  */
@@ -245,68 +196,6 @@ static void daemon_request_shutdown(struct ctdb_client *client,
 }
 
 
-/*
-  send a fetch lock error reply to the client
- */
-static void daemon_fetch_lock_error(struct ctdb_client *client,
-                                   struct ctdb_req_fetch_lock *f)
-{
-       struct ctdb_reply_fetch_lock r;
-
-       ZERO_STRUCT(r);
-       r.hdr.length       = sizeof(r);
-       r.hdr.ctdb_magic   = CTDB_MAGIC;
-       r.hdr.ctdb_version = CTDB_VERSION;
-       r.hdr.operation    = CTDB_REPLY_FETCH_LOCK;
-       r.hdr.reqid        = f->hdr.reqid;
-       r.state            = -1;
-       
-       /*
-        * Ignore the result, there's not much we can do anyway.
-        */
-       ctdb_queue_send(client->queue, (uint8_t *)&r.hdr,
-                       r.hdr.length);
-}
-
-/*
-  called when the daemon gets a fetch lock request from a client
- */
-static void daemon_request_fetch_lock(struct ctdb_client *client, 
-                                     struct ctdb_req_fetch_lock *f)
-{
-       struct ctdb_call_state *state;
-       TDB_DATA key, *data;
-       struct ctdb_db_context *ctdb_db;
-       struct client_fetch_lock_data *fl_data;
-
-       ctdb_db = find_ctdb_db(client->ctdb, f->db_id);
-       if (ctdb_db == NULL) {
-               daemon_fetch_lock_error(client, f);
-               return;
-       }
-
-       if (!ctdb_validate_vnn(client->ctdb, f->header.dmaster)) {
-               DEBUG(0,(__location__ " Invalid dmaster %u\n", f->header.dmaster));
-               daemon_fetch_lock_error(client, f);
-               return;
-       }
-
-       key.dsize = f->keylen;
-       key.dptr = &f->key[0];
-
-       data        = talloc(client, TDB_DATA);
-       data->dptr  = NULL;
-       data->dsize = 0;
-
-       state = ctdb_daemon_fetch_lock_send(ctdb_db, client, key, &f->header, data);
-       talloc_steal(state, data);
-
-       fl_data = talloc(state, struct client_fetch_lock_data);
-       fl_data->client = client;
-       fl_data->reqid  = f->hdr.reqid;
-       state->async.fn = daemon_fetch_lock_complete;
-       state->async.private_data = fl_data;
-}
 
 /*
   called when the daemon gets a connect wait request from a client
@@ -330,13 +219,59 @@ static void daemon_request_connect_wait(struct ctdb_client *client,
        r.vnn           = ctdb_get_vnn(client->ctdb);
        r.num_connected = client->ctdb->num_connected;
        
-       res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length);
+       res = daemon_queue_send(client, &r.hdr);
        if (res != 0) {
                DEBUG(0,(__location__ " Failed to queue a connect wait response\n"));
                return;
        }
 }
 
+
+/*
+  called when the daemon gets a getdbpath request from a client
+ */
+static void daemon_request_getdbpath(struct ctdb_client *client, 
+                                 struct ctdb_req_getdbpath *c)
+{
+       struct ctdb_reply_getdbpath *r;
+       struct ctdb_db_context *ctdb_db;
+       char *path;
+       int res, len;
+
+       ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
+       if (!ctdb_db) {
+               DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x",
+                         c->db_id));
+               ctdb_db->ctdb->status.pending_calls--;
+               return;
+       }
+
+       path = talloc_asprintf(c, "%s/%s", ctdb_db->ctdb->db_directory, ctdb_db->db_name);
+
+       /* now send the reply */
+       len = offsetof(struct ctdb_reply_getdbpath, data) + strlen(path);
+       r = ctdbd_allocate_pkt(ctdb_db->ctdb, len);
+
+       talloc_set_name_const(r, "reply_getdbpath packet");
+
+       memset(r, 0, offsetof(struct ctdb_reply_getdbpath, data));
+
+       r->hdr.length       = len;
+       r->hdr.ctdb_magic   = CTDB_MAGIC;
+       r->hdr.ctdb_version = CTDB_VERSION;
+       r->hdr.operation    = CTDB_REPLY_GETDBPATH;
+       r->hdr.reqid        = c->hdr.reqid;
+       r->datalen          = strlen(path);
+       memcpy(&r->data[0], path, r->datalen);
+
+       res = daemon_queue_send(client, &(r->hdr));
+       if (res != 0) {
+               DEBUG(0,(__location__ " Failed to queue a getdbpath response\n"));
+               return;
+       }
+}
+
+
 /*
   destroy a ctdb_client
 */
@@ -375,6 +310,64 @@ static void daemon_request_message_from_client(struct ctdb_client *client,
        }
 }
 
+
+struct daemon_call_state {
+       struct ctdb_client *client;
+       uint32_t reqid;
+       struct ctdb_call *call;
+       struct timeval start_time;
+};
+
+/* 
+   complete a call from a client 
+*/
+static void daemon_call_from_client_callback(struct ctdb_call_state *state)
+{
+       struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
+                                                          struct daemon_call_state);
+       struct ctdb_reply_call *r;
+       int res;
+       uint32_t length;
+       struct ctdb_client *client = dstate->client;
+
+       talloc_steal(client, dstate);
+       talloc_steal(dstate, dstate->call);
+
+       res = ctdb_daemon_call_recv(state, dstate->call);
+       if (res != 0) {
+               DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n"));
+               client->ctdb->status.pending_calls--;
+               ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time);
+               return;
+       }
+
+       length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
+       r = ctdbd_allocate_pkt(dstate, length);
+       if (r == NULL) {
+               DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
+               client->ctdb->status.pending_calls--;
+               ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time);
+               return;
+       }
+       memset(r, 0, offsetof(struct ctdb_reply_call, data));
+       r->hdr.length       = length;
+       r->hdr.ctdb_magic   = CTDB_MAGIC;
+       r->hdr.ctdb_version = CTDB_VERSION;
+       r->hdr.operation    = CTDB_REPLY_CALL;
+       r->hdr.reqid        = dstate->reqid;
+       r->datalen          = dstate->call->reply_data.dsize;
+       memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
+
+       res = daemon_queue_send(client, &r->hdr);
+       if (res != 0) {
+               DEBUG(0, (__location__ "Failed to queue packet from daemon to client\n"));
+       }
+       ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time);
+       talloc_free(dstate);
+       client->ctdb->status.pending_calls--;
+}
+
+
 /*
   this is called when the ctdb daemon received a ctdb request call
   from a local client over the unix domain socket
@@ -384,64 +377,101 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
 {
        struct ctdb_call_state *state;
        struct ctdb_db_context *ctdb_db;
-       struct ctdb_call call;
-       struct ctdb_reply_call *r;
-       int res;
-       uint32_t length;
+       struct daemon_call_state *dstate;
+       struct ctdb_call *call;
+       struct ctdb_ltdb_header header;
+       TDB_DATA key, data;
+       int ret;
+       struct ctdb_context *ctdb = client->ctdb;
+
+       ctdb->status.total_calls++;
+       ctdb->status.pending_calls++;
 
        ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
        if (!ctdb_db) {
                DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x",
                          c->db_id));
+               ctdb->status.pending_calls--;
                return;
        }
 
-       ZERO_STRUCT(call);
-       call.call_id = c->callid;
-       call.key.dptr = c->data;
-       call.key.dsize = c->keylen;
-       call.call_data.dptr = c->data + c->keylen;
-       call.call_data.dsize = c->calldatalen;
+       key.dptr = c->data;
+       key.dsize = c->keylen;
 
-       state = ctdb_daemon_call_send(ctdb_db, &call);
-//     state->async.fn = daemon_call_from_client_callback;
-//     state->async.private_data = state;
+       ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
+                                          (struct ctdb_req_header *)c, &data,
+                                          daemon_incoming_packet, client);
+       if (ret == -2) {
+               /* will retry later */
+               ctdb->status.pending_calls--;
+               return;
+       }
 
-/* XXX this must be converted to fully async */
-       res = ctdb_daemon_call_recv(state, &call);
-       if (res != 0) {
-               DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n"));
-               exit(1);
+       if (ret != 0) {
+               DEBUG(0,(__location__ " Unable to fetch record\n"));
+               ctdb->status.pending_calls--;
+               return;
        }
 
-       length = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize;
-       r = ctdbd_allocate_pkt(client->ctdb, length);
-       if (r == NULL) {
-               DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
+       dstate = talloc(client, struct daemon_call_state);
+       if (dstate == NULL) {
+               ctdb_ltdb_unlock(ctdb_db, key);
+               DEBUG(0,(__location__ " Unable to allocate dstate\n"));
+               ctdb->status.pending_calls--;
+               return;
+       }
+       dstate->start_time = timeval_current();
+       dstate->client = client;
+       dstate->reqid  = c->hdr.reqid;
+       talloc_steal(dstate, data.dptr);
+
+       call = dstate->call = talloc_zero(dstate, struct ctdb_call);
+       if (call == NULL) {
+               ctdb_ltdb_unlock(ctdb_db, key);
+               DEBUG(0,(__location__ " Unable to allocate call\n"));
+               ctdb->status.pending_calls--;
+               ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time);
                return;
        }
-       memset(r, 0, offsetof(struct ctdb_reply_call, data));
-       r->hdr.length       = length;
-       r->hdr.ctdb_magic   = CTDB_MAGIC;
-       r->hdr.ctdb_version = CTDB_VERSION;
-       r->hdr.operation    = CTDB_REPLY_CALL;
-       r->hdr.reqid        = c->hdr.reqid;
-       r->datalen          = call.reply_data.dsize;
-       memcpy(&r->data[0], call.reply_data.dptr, r->datalen);
 
-       res = ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, r->hdr.length);
-       if (res != 0) {
-               DEBUG(0, (__location__ "Failed to queue packet from daemon to client\n"));
+       call->call_id = c->callid;
+       call->key = key;
+       call->call_data.dptr = c->data + c->keylen;
+       call->call_data.dsize = c->calldatalen;
+       call->flags = c->flags;
+
+       if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
+               state = ctdb_call_local_send(ctdb_db, call, &header, &data);
+       } else {
+               state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
        }
-       talloc_free(r);
+
+       ctdb_ltdb_unlock(ctdb_db, key);
+
+       if (state == NULL) {
+               DEBUG(0,(__location__ " Unable to setup call send\n"));
+               ctdb->status.pending_calls--;
+               ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time);
+               return;
+       }
+       talloc_steal(state, dstate);
+       talloc_steal(client, state);
+
+       state->async.fn = daemon_call_from_client_callback;
+       state->async.private_data = dstate;
 }
 
 
+static void daemon_request_control_from_client(struct ctdb_client *client, 
+                                              struct ctdb_req_control *c);
+
 /* data contains a packet from the client */
-static void daemon_incoming_packet(struct ctdb_client *client, void *data, size_t nread)
+static void daemon_incoming_packet(void *p, uint8_t *data, uint32_t nread)
 {
-       struct ctdb_req_header *hdr = data;
+       struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
+       struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
        TALLOC_CTX *tmp_ctx;
+       struct ctdb_context *ctdb = client->ctdb;
 
        /* place the packet as a child of a tmp_ctx. We then use
           talloc_free() below to free it. If any of the calls want
@@ -462,26 +492,40 @@ static void daemon_incoming_packet(struct ctdb_client *client, void *data, size_
 
        switch (hdr->operation) {
        case CTDB_REQ_CALL:
+               ctdb->status.client.req_call++;
                daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
                break;
 
        case CTDB_REQ_REGISTER:
+               ctdb->status.client.req_register++;
                daemon_request_register_message_handler(client, 
                                                        (struct ctdb_req_register *)hdr);
                break;
+
        case CTDB_REQ_MESSAGE:
+               ctdb->status.client.req_message++;
                daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
                break;
 
        case CTDB_REQ_CONNECT_WAIT:
+               ctdb->status.client.req_connect_wait++;
                daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr);
                break;
-       case CTDB_REQ_FETCH_LOCK:
-               daemon_request_fetch_lock(client, (struct ctdb_req_fetch_lock *)hdr);
-               break;
+
        case CTDB_REQ_SHUTDOWN:
+               ctdb->status.client.req_shutdown++;
                daemon_request_shutdown(client, (struct ctdb_req_shutdown *)hdr);
                break;
+
+       case CTDB_REQ_GETDBPATH:
+               daemon_request_getdbpath(client, (struct ctdb_req_getdbpath *)hdr);
+               break;
+
+       case CTDB_REQ_CONTROL:
+               ctdb->status.client.req_control++;
+               daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
+               break;
+
        default:
                DEBUG(0,(__location__ " daemon: unrecognized operation %d\n",
                         hdr->operation));
@@ -491,7 +535,9 @@ done:
        talloc_free(tmp_ctx);
 }
 
-
+/*
+  called when the daemon gets a incoming packet
+ */
 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
 {
        struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
@@ -502,6 +548,8 @@ static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
                return;
        }
 
+       client->ctdb->status.client_packets_recv++;
+
        if (cnt < sizeof(*hdr)) {
                ctdb_set_error(client->ctdb, "Bad packet length %d in daemon\n", cnt);
                return;
@@ -523,6 +571,10 @@ static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
                return;
        }
 
+       DEBUG(3,(__location__ " client request %d of type %d length %d from "
+                "node %d to %d\n", hdr->reqid, hdr->operation, hdr->length,
+                hdr->srcnode, hdr->destnode));
+
        /* it is the responsibility of the incoming packet function to free 'data' */
        daemon_incoming_packet(client, data, cnt);
 }
@@ -626,8 +678,6 @@ int ctdb_start(struct ctdb_context *ctdb)
        struct fd_event *fde;
        const char *domain_socket_name;
 
-       /* generate a name to use for our local socket */
-       ctdb->daemon.name = talloc_asprintf(ctdb, "%s.%s", CTDB_PATH, ctdb->address.address);
        /* get rid of any old sockets */
        unlink(ctdb->daemon.name);
 
@@ -653,6 +703,12 @@ int ctdb_start(struct ctdb_context *ctdb)
                close(fd[0]);
                close(ctdb->daemon.sd);
                ctdb->daemon.sd = -1;
+
+               /* Added because of ctdb->methods->allocate_pkt calls */
+               /* TODO: clean */
+               int ctdb_tcp_init(struct ctdb_context *ctdb);
+               ctdb_tcp_init(ctdb);
+
                return 0;
        }
 
@@ -675,12 +731,12 @@ int ctdb_start(struct ctdb_context *ctdb)
 /*
   allocate a packet for use in client<->daemon communication
  */
-void *ctdbd_allocate_pkt(struct ctdb_context *ctdb, size_t len)
+void *ctdbd_allocate_pkt(TALLOC_CTX *mem_ctx, size_t len)
 {
        int size;
 
        size = (len+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
-       return talloc_size(ctdb, size);
+       return talloc_size(mem_ctx, size);
 }
 
 /*
@@ -690,3 +746,77 @@ void ctdb_request_finished(struct ctdb_context *ctdb, struct ctdb_req_header *hd
 {
        ctdb->num_finished++;
 }
+
+
+struct daemon_control_state {
+       struct ctdb_client *client;
+       struct ctdb_req_control *c;
+};
+
+/*
+  callback when a control reply comes in
+ */
+static void daemon_control_callback(struct ctdb_context *ctdb,
+                                   uint32_t status, TDB_DATA data, 
+                                   void *private_data)
+{
+       struct daemon_control_state *state = talloc_get_type(private_data, 
+                                                            struct daemon_control_state);
+       struct ctdb_client *client = state->client;
+       struct ctdb_reply_control *r;
+       size_t len;
+
+       DEBUG(0,("callback: size=%u\n", data.dsize));
+       DEBUG(0,("callback: size=%u\n", data.dsize));
+       DEBUG(0,("callback: size=%u\n", data.dsize));
+       DEBUG(0,("callback: size=%u\n", data.dsize));
+       DEBUG(0,("callback: size=%u\n", data.dsize));
+       DEBUG(0,("callback: size=%u\n", data.dsize));
+
+       /* construct a message to send to the client containing the data */
+       len = offsetof(struct ctdb_req_control, data) + data.dsize;
+       r = ctdbd_allocate_pkt(client, len);
+       talloc_set_name_const(r, "reply_control packet");
+
+       memset(r, 0, offsetof(struct ctdb_req_message, data));
+
+       r->hdr.length    = len;
+       r->hdr.ctdb_magic = CTDB_MAGIC;
+       r->hdr.ctdb_version = CTDB_VERSION;
+       r->hdr.operation = CTDB_REPLY_CONTROL;
+       r->status        = status;
+       r->datalen       = data.dsize;
+       memcpy(&r->data[0], data.dptr, data.dsize);
+
+       daemon_queue_send(client, &r->hdr);
+
+       talloc_free(state);
+}
+
+/*
+  this is called when the ctdb daemon received a ctdb request control
+  from a local client over the unix domain socket
+ */
+static void daemon_request_control_from_client(struct ctdb_client *client, 
+                                              struct ctdb_req_control *c)
+{
+       TDB_DATA data;
+       int res;
+       struct daemon_control_state *state;
+
+       state = talloc(client, struct daemon_control_state);
+       CTDB_NO_MEMORY_VOID(client->ctdb, state);
+
+       state->client = client;
+       state->c = talloc_steal(state, c);
+       
+       data.dptr = &c->data[0];
+       data.dsize = c->datalen;
+       res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
+                                      c->srvid, c->opcode, data, daemon_control_callback,
+                                      state);
+       if (res != 0) {
+               DEBUG(0,(__location__ " Failed to send control to remote node %u\n",
+                        c->hdr.destnode));
+       }
+}