add a test that sends messages between clients connected to the same ctdb
authorRonnie sahlberg <ronniesahlberg@gmail.com>
Wed, 11 Apr 2007 03:43:15 +0000 (13:43 +1000)
committerRonnie sahlberg <ronniesahlberg@gmail.com>
Wed, 11 Apr 2007 03:43:15 +0000 (13:43 +1000)
add code to actually pass the messages between clients and ctdb

(This used to be ctdb commit 6d5b55d7b9c611fb5e98765906757a7d82e4bf6b)

1  2 
ctdb/Makefile.in
ctdb/common/ctdb_client.c
ctdb/common/ctdb_daemon.c
ctdb/common/ctdb_message.c
ctdb/include/ctdb.h
ctdb/include/ctdb_private.h
ctdb/tests/bench.sh
ctdb/tests/ctdb_bench.c
ctdb/tests/ctdb_messaging.c
ctdb/tests/messaging.sh

index 940f9794a65145d50ceb279b1586502a6b6e6ecd,940f9794a65145d50ceb279b1586502a6b6e6ecd..e085303c901d487cfbbcdc9387351e531b5d55d7
@@@ -30,7 -30,7 +30,7 @@@ CTDB_OBJ = $(CTDB_COMMON_OBJ) $(CTDB_TC
  
  OBJS = @TDBOBJ@ @TALLOCOBJ@ @LIBREPLACEOBJ@ @INFINIBAND_WRAPPER_OBJ@ $(EXTRA_OBJ) $(EVENTS_OBJ) $(CTDB_OBJ)
  
--BINS = bin/ctdb_test bin/ctdb_bench bin/ctdb_fetch @INFINIBAND_BINS@
++BINS = bin/ctdb_test bin/ctdb_bench bin/ctdb_messaging bin/ctdb_fetch @INFINIBAND_BINS@
  
  DIRS = lib bin
  
@@@ -61,6 -61,6 +61,10 @@@ bin/ctdb_fetch: $(OBJS) tests/ctdb_fetc
        @echo Linking $@
        @$(CC) $(CFLAGS) -o $@ tests/ctdb_fetch.o $(OBJS) $(LIB_FLAGS)
  
++bin/ctdb_messaging: $(OBJS) tests/ctdb_messaging.o
++      @echo Linking $@
++      @$(CC) $(CFLAGS) -o $@ tests/ctdb_messaging.o $(OBJS) $(LIB_FLAGS)
++
  bin/ibwrapper_test: $(OBJS) ib/ibwrapper_test.o
        @echo Linking $@
        @$(CC) $(CFLAGS) -o $@ ib/ibwrapper_test.o $(OBJS) $(LIB_FLAGS)
index cbf213672919adb767ec17ab616710e3ebc97bf8,e837a1af879e510d90d4cb8704dc6eec1c4126b4..f2e551091ce304db8e99cf69ab7b97a060850162
@@@ -255,3 -263,51 +263,87 @@@ struct ctdb_call_state *ctdb_client_cal
        ctdb_ltdb_unlock(ctdb_db, call->key);
        return state;
  }
 -      struct ctdb_register_call c;
+ /*
+   tell the daemon what messaging srvid we will use, and register the message
+   handler function in the client
+ */
+ int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
+                                   ctdb_message_fn_t handler,
+                                   void *private)
+                                   
+ {
++      struct ctdb_req_register c;
+       int res;
++      /* if the domain socket is not yet open, open it */
++      if (ctdb->daemon.sd==-1) {
++              ux_socket_connect(ctdb);
++      }
++
+       ZERO_STRUCT(c);
+       c.hdr.length       = sizeof(c);
+       c.hdr.ctdb_magic   = CTDB_MAGIC;
+       c.hdr.ctdb_version = CTDB_VERSION;
+       c.hdr.operation    = CTDB_REQ_REGISTER;
+       c.srvid            = srvid;
+       res = ctdb_client_queue_pkt(ctdb, &c.hdr);
+       if (res != 0) {
+               return res;
+       }
+       /* also need to register the handler with our ctdb structure */
+       return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private);
+ }
+ /*
+   setup handler for receipt of ctdb messages from ctdb_send_message()
+ */
+ int ctdb_set_message_handler(struct ctdb_context *ctdb, 
+                            uint32_t srvid, 
+                            ctdb_message_fn_t handler,
+                            void *private)
+ {
+       if (ctdb->flags & CTDB_FLAG_DAEMON_MODE) {
+               return ctdb_client_set_message_handler(ctdb, srvid, handler, private);
+       }
+       return ctdb_daemon_set_message_handler(ctdb, srvid, handler, private);
+ }
++
++int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t vnn,
++                    uint32_t srvid, TDB_DATA data)
++{
++      struct ctdb_req_message *r;
++      int len, res;
++
++      len = offsetof(struct ctdb_req_message, data) + data.dsize;
++      r = ctdb->methods->allocate_pkt(ctdb, len);
++      CTDB_NO_MEMORY(ctdb, r);
++      talloc_set_name_const(r, "req_message packet");
++
++      r->hdr.length    = len;
++      r->hdr.ctdb_magic = CTDB_MAGIC;
++      r->hdr.ctdb_version = CTDB_VERSION;
++      r->hdr.operation = CTDB_REQ_MESSAGE;
++      r->hdr.destnode  = vnn;
++      r->hdr.srcnode   = ctdb->vnn;
++      r->hdr.reqid     = 0;
++      r->srvid         = srvid;
++      r->datalen       = data.dsize;
++      memcpy(&r->data[0], data.dptr, data.dsize);
++      
++      res = ctdb_client_queue_pkt(ctdb, &r->hdr);
++      if (res != 0) {
++              return res;
++      }
++
++      talloc_free(r);
++      return 0;
++}
index a7cd7217280e848ee5d779b4cf3d7cc71a8edf3f,b520dc7f0d8922f79bb0f2916a12299fa1fa3ea2..f3ef3c6d4c7b077a92674bc9f6cae4207a39b35a
@@@ -58,6 -58,56 +58,56 @@@ struct ctdb_client 
  };
  
  
 -      CTDB_NO_MEMORY(ctdb, r);
+ /*
+   message handler for when we are in daemon mode. This redirects the message
+   to the right client
+  */
+ static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
+                                   TDB_DATA data, void *private)
+ {
+       struct ctdb_client *client = talloc_get_type(private, struct ctdb_client);
+       struct ctdb_req_message *r;
+       int len;
+       /* construct a message to send to the client containing the data */
+       len = offsetof(struct ctdb_req_message, data) + data.dsize;
+       r = ctdbd_allocate_pkt(ctdb, len);
 -      return 0;
++
++/*XXX cant use this since it returns an int   CTDB_NO_MEMORY(ctdb, r);*/
+       talloc_set_name_const(r, "req_message packet");
+       r->hdr.length    = len;
+       r->hdr.ctdb_magic = CTDB_MAGIC;
+       r->hdr.ctdb_version = CTDB_VERSION;
+       r->hdr.operation = CTDB_REQ_MESSAGE;
+       r->srvid         = srvid;
+       r->datalen       = data.dsize;
+       memcpy(&r->data[0], data.dptr, data.dsize);
+       
+       ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, len);
+       talloc_free(r);
 -      printf("XXX registering messaging handler %u in daemon\n", c->srvid);
++      return;
+ }
+                                          
+ /*
+   this is called when the ctdb daemon received a ctdb request to 
+   set the srvid from the client
+  */
+ static void daemon_request_register_message_handler(struct ctdb_client *client, 
+                                                   struct ctdb_req_register *c)
+ {
+       int res;
+       res = ctdb_register_message_handler(client->ctdb, client, 
+                                           c->srvid, daemon_message_handler, 
+                                           client);
+       if (res != 0) {
+               printf("Failed to register handler %u in daemon\n", c->srvid);
+       }
+ }
  /*
    destroy a ctdb_client
  */
@@@ -69,6 -119,6 +119,21 @@@ static int ctdb_client_destructor(struc
  }
  
  
++/*
++  this is called when the ctdb daemon received a ctdb request message
++  from a local client over the unix domain socket
++ */
++static void daemon_request_message_from_client(struct ctdb_client *client, 
++                                          struct ctdb_req_message *c)
++{
++      if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
++              ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
++      } else {
++              /* this is for a remote client */
++/*XXX*/
++      }
++}
++
  /*
    this is called when the ctdb daemon received a ctdb request call
    from a local client over the unix domain socket
@@@ -152,6 -202,10 +217,13 @@@ static void client_incoming_packet(stru
                daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
                break;
  
+       case CTDB_REQ_REGISTER:
+               daemon_request_register_message_handler(client, 
+                                                       (struct ctdb_req_register *)hdr);
+               break;
++      case CTDB_REQ_MESSAGE:
++              daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
++              break;
        }
  
        talloc_free(data);
@@@ -342,3 -396,3 +414,10 @@@ void *ctdbd_allocate_pkt(struct ctdb_co
        return talloc_size(ctdb, size);
  }
  
++int ctdb_daemon_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
++                           ctdb_message_fn_t handler,
++                           void *private)
++{
++      return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private);
++}
++
index ec39525942d25cf3780b4af7248bbfcf22d6eca5,27c5c64bc0eea51af236aff2fe4057ac759e0d25..5121db28aaebada863c2dd200a208d677d6bdb1f
  void ctdb_request_message(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
  {
        struct ctdb_req_message *c = (struct ctdb_req_message *)hdr;
++      struct ctdb_message_list *fml, *ml;
        TDB_DATA data;
--      if (ctdb->message_handler == NULL) {
++
++/* XXX need a much faster method to find the handler */
++      ml  = ctdb->message_list;
++      fml = ml;
++      while (ml) {
++              if (ml->srvid==c->srvid) {
++                      break;
++              }
++              ml = ml->next;
++              if (ml==fml) {
++                      ml = NULL;
++                      break;
++              }
++      }
++
++      if (ml == NULL) {
                printf("no msg handler\n");
                /* no registered message handler */
                return;
        }
        data.dptr = &c->data[0];
        data.dsize = c->datalen;
--      ctdb->message_handler(ctdb, c->srvid, data, ctdb->message_private);
++      ml->message_handler(ctdb, c->srvid, data, ml->message_private);
  }
  
  
  /*
    send a ctdb message
  */
--int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
++int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn,
                      uint32_t srvid, TDB_DATA data)
  {
        struct ctdb_req_message *r;
        return 0;
  }
  
-   setup handler for receipt of ctdb messages from ctdb_send_message()
 +/*
- int ctdb_set_message_handler(struct ctdb_context *ctdb, ctdb_message_fn_t handler,
-                            uint32_t srvid, void *private)
++  send a ctdb message
 +*/
-       ctdb->message_handler = handler;
-       ctdb->message_private = private;
++int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
++                    uint32_t srvid, TDB_DATA data)
 +{
++      if (ctdb->flags & CTDB_FLAG_DAEMON_MODE) {
++              return ctdb_client_send_message(ctdb, vnn, srvid, data);
++      }
++      return ctdb_daemon_send_message(ctdb, vnn, srvid, data);
++}
++
++
+ /*
+   when a client goes away, we need to remove its srvid handler from the list
+  */
+ static int message_handler_destructor(struct ctdb_message_list *m)
+ {
+       DLIST_REMOVE(m->ctdb->message_list, m);
 +      return 0;
  }
  
+ /*
+   setup handler for receipt of ctdb messages from ctdb_send_message()
+ */
+ int ctdb_register_message_handler(struct ctdb_context *ctdb, 
+                                 TALLOC_CTX *mem_ctx,
+                                 uint32_t srvid,
+                                 ctdb_message_fn_t handler,
+                                 void *private)
+ {
+       struct ctdb_message_list *m;
+       m = talloc(mem_ctx, struct ctdb_message_list);
+       CTDB_NO_MEMORY(ctdb, m);
+       m->ctdb            = ctdb;
+       m->srvid           = srvid;
+       m->message_handler = handler;
+       m->message_private = private;
+       
+       DLIST_ADD(ctdb->message_list, m);
+       talloc_set_destructor(m, message_handler_destructor);
+       return 0;
+ }
index 28c6f21f082641e96272a20db3a8e8bffd6c8b5d,f77b34abbda827ad06cdbba67ab27780a8aef3bb..becdea7cd77d417be79d3a8cf076fd2785b9bb9d
@@@ -152,8 -152,8 +152,9 @@@ uint32_t ctdb_get_num_nodes(struct ctdb
  /* setup a handler for ctdb messages */
  typedef void (*ctdb_message_fn_t)(struct ctdb_context *, uint32_t srvid, 
                                  TDB_DATA data, void *);
--int ctdb_set_message_handler(struct ctdb_context *ctdb, ctdb_message_fn_t handler,
--                           uint32_t srvid, void *private);
++int ctdb_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
++                           ctdb_message_fn_t handler,
++                           void *private);
  
  
  int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call);
@@@ -180,11 -180,4 +181,10 @@@ struct ctdb_record_handle *ctdb_fetch_l
   */
  int ctdb_record_store(struct ctdb_record_handle *rec, TDB_DATA data);
  
- /* when running in daemon mode this function is used by a client to tell 
-    ctdb daemon what its local identifier is.
-    when in non-daemon mode this is a noop.
-  */
- int ctdb_register_message_local_id(struct ctdb_context *ctdb, uint32_t messenger_id);
-  
++int ctdb_register_message_handler(struct ctdb_context *ctdb, 
++                                TALLOC_CTX *mem_ctx,
++                                uint32_t srvid,
++                                ctdb_message_fn_t handler,
++                                void *private);
 +
  #endif
index 5d2e36b10489d0435542a6a44ef84608680cbc4c,416156482e636a318f9f8cb5dc12fe6a74e2a105..30123d136affbb222b3c94c61f64fe38fe8e66d2
@@@ -260,10 -269,9 +269,9 @@@ struct ctdb_reply_dmaster 
        uint8_t  data[1];
  };
  
--struct ctdb_register_call {
++struct ctdb_req_register {
        struct ctdb_req_header hdr;
-       uint32_t datalen;
-       uint8_t data[4];
+       uint32_t srvid;
  };
  
  struct ctdb_req_message {
@@@ -360,4 -368,4 +368,11 @@@ struct ctdb_call_state *ctdb_client_cal
  */
  int ctdb_client_call_recv(struct ctdb_call_state *state, struct ctdb_call *call);
  
++int ctdb_daemon_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
++                           ctdb_message_fn_t handler,
++                           void *private);
++
++int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t vnn,
++                           uint32_t srvid, TDB_DATA data);
++
  #endif
index b9b93b01b68c74f71794e16aa40e943bfab4bdd5,b9b93b01b68c74f71794e16aa40e943bfab4bdd5..50e9e08f995dbc6aa85ea3b4aa960c0c69a7e815
@@@ -3,7 -3,7 +3,7 @@@
  killall -q ctdb_bench
  
  echo "Trying 2 nodes"
--bin/ctdb_bench --nlist tests/nodes.txt --listen 127.0.0.2:9001 --pid=55 $* &
--bin/ctdb_bench --nlist tests/nodes.txt --listen 127.0.0.1:9001 --pid=66 $*
++bin/ctdb_bench --nlist tests/nodes.txt --listen 127.0.0.2:9001 $* &
++bin/ctdb_bench --nlist tests/nodes.txt --listen 127.0.0.1:9001 $*
  
  killall -q ctdb_bench
index 0d56d2ec289debc1b4c5869b61414733497dd96d,0d56d2ec289debc1b4c5869b61414733497dd96d..658a2e4dadde095c8e2d3870ed0e32a284364d75
@@@ -294,7 -294,7 +294,7 @@@ int main(int argc, const char *argv[]
        /* start the protocol running */
        ret = ctdb_start(ctdb);
  
--      ctdb_set_message_handler(ctdb, ring_message_handler, 0, &msg_count);
++      ctdb_set_message_handler(ctdb, 0, ring_message_handler,&msg_count);
  
        /* wait until all nodes are connected (should not be needed
           outside of test code) */
index 0000000000000000000000000000000000000000,0000000000000000000000000000000000000000..5ee02c5e400fd1a12996865fd42dd2101e489bbd
new file mode 100644 (file)
--- /dev/null
--- /dev/null
@@@ -1,0 -1,0 +1,171 @@@
++/* 
++   test of messaging
++
++   Copyright (C) Andrew Tridgell  2006
++
++   This library is free software; you can redistribute it and/or
++   modify it under the terms of the GNU Lesser General Public
++   License as published by the Free Software Foundation; either
++   version 2 of the License, or (at your option) any later version.
++
++   This library is distributed in the hope that it will be useful,
++   but WITHOUT ANY WARRANTY; without even the implied warranty of
++   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
++   Lesser General Public License for more details.
++
++   You should have received a copy of the GNU Lesser General Public
++   License along with this library; if not, write to the Free Software
++   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
++*/
++
++#include "includes.h"
++#include "lib/events/events.h"
++#include "system/filesys.h"
++#include "popt.h"
++
++static int timelimit = 10;
++static int num_records = 10;
++static int num_msgs = 1;
++static int num_repeats = 100;
++
++
++/*
++  handler for messages in bench_ring()
++*/
++static void message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
++                               TDB_DATA data, void *private)
++{
++printf("client vnn:%d received a message to srvid:%d\n",ctdb_get_vnn(ctdb),srvid);
++}
++
++/*
++  main program
++*/
++int main(int argc, const char *argv[])
++{
++      struct ctdb_context *ctdb;
++      struct ctdb_db_context *ctdb_db;
++      const char *nlist = NULL;
++      const char *transport = "tcp";
++      const char *myaddress = NULL;
++      int self_connect=0;
++      int daemon_mode=0;
++
++      struct poptOption popt_options[] = {
++              POPT_AUTOHELP
++              { "nlist", 0, POPT_ARG_STRING, &nlist, 0, "node list file", "filename" },
++              { "listen", 0, POPT_ARG_STRING, &myaddress, 0, "address to listen on", "address" },
++              { "transport", 0, POPT_ARG_STRING, &transport, 0, "protocol transport", NULL },
++              { "self-connect", 0, POPT_ARG_NONE, &self_connect, 0, "enable self connect", "boolean" },
++              { "daemon", 0, POPT_ARG_NONE, &daemon_mode, 0, "spawn a ctdb daemon", "boolean" },
++              { "timelimit", 't', POPT_ARG_INT, &timelimit, 0, "timelimit", "integer" },
++              { "num-records", 'r', POPT_ARG_INT, &num_records, 0, "num_records", "integer" },
++              { "num-msgs", 'n', POPT_ARG_INT, &num_msgs, 0, "num_msgs", "integer" },
++              POPT_TABLEEND
++      };
++      int opt;
++      const char **extra_argv;
++      int extra_argc = 0;
++      int ret;
++      poptContext pc;
++      struct event_context *ev;
++      pid_t pid;
++      uint32_t srvid;
++      TDB_DATA data;
++
++      pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
++
++      while ((opt = poptGetNextOpt(pc)) != -1) {
++              switch (opt) {
++              default:
++                      fprintf(stderr, "Invalid option %s: %s\n", 
++                              poptBadOption(pc, 0), poptStrerror(opt));
++                      exit(1);
++              }
++      }
++
++      /* setup the remaining options for the main program to use */
++      extra_argv = poptGetArgs(pc);
++      if (extra_argv) {
++              extra_argv++;
++              while (extra_argv[extra_argc]) extra_argc++;
++      }
++
++      if (nlist == NULL || myaddress == NULL) {
++              printf("You must provide a node list with --nlist and an address with --listen\n");
++              exit(1);
++      }
++
++      ev = event_context_init(NULL);
++
++      /* initialise ctdb */
++      ctdb = ctdb_init(ev);
++      if (ctdb == NULL) {
++              printf("Failed to init ctdb\n");
++              exit(1);
++      }
++
++      if (self_connect) {
++              ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT);
++      }
++      if (daemon_mode) {
++              ctdb_set_flags(ctdb, CTDB_FLAG_DAEMON_MODE);
++      }
++
++      ret = ctdb_set_transport(ctdb, transport);
++      if (ret == -1) {
++              printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb));
++              exit(1);
++      }
++
++      /* tell ctdb what address to listen on */
++      ret = ctdb_set_address(ctdb, myaddress);
++      if (ret == -1) {
++              printf("ctdb_set_address failed - %s\n", ctdb_errstr(ctdb));
++              exit(1);
++      }
++
++      /* tell ctdb what nodes are available */
++      ret = ctdb_set_nlist(ctdb, nlist);
++      if (ret == -1) {
++              printf("ctdb_set_nlist failed - %s\n", ctdb_errstr(ctdb));
++              exit(1);
++      }
++
++      /* attach to a specific database */
++      ctdb_db = ctdb_attach(ctdb, "test.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666);
++      if (!ctdb_db) {
++              printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb));
++              exit(1);
++      }
++
++      /* start the protocol running */
++      ret = ctdb_start(ctdb);
++
++/*XXX why does this block forever?    ctdb_connect_wait(ctdb);*/
++
++      pid=fork();
++      if (pid) {
++              srvid=0;
++      } else {
++              srvid=1;
++      }
++
++      /* wait until all nodes are connected (should not be needed
++         outside of test code) */
++      data.dptr=NULL;
++      data.dsize=0;
++      ctdb_set_message_handler(ctdb, srvid, message_handler, NULL);
++
++sleep(3);
++printf("sending message from vnn:%d to vnn:%d/srvid:%d\n",ctdb_get_vnn(ctdb),ctdb_get_vnn(ctdb), 1-srvid);
++      ctdb_send_message(ctdb, ctdb_get_vnn(ctdb), 1-srvid, data);
++
++      while(1){
++              event_loop_once(ev);
++      }
++       
++      /* shut it down */
++      talloc_free(ctdb);
++      return 0;
++}
index 0000000000000000000000000000000000000000,0000000000000000000000000000000000000000..179a2bef88d584cd20d0382a3c5947d94657573f
new file mode 100755 (executable)
--- /dev/null
--- /dev/null
@@@ -1,0 -1,0 +1,9 @@@
++#!/bin/sh
++
++killall -q ctdb_messaging
++
++echo "Trying 2 nodes"
++bin/ctdb_messaging --nlist tests/nodes.txt --listen 127.0.0.2:9001 $* &
++bin/ctdb_messaging --nlist tests/nodes.txt --listen 127.0.0.1:9001 $*
++
++killall -q ctdb_messaging