}
}
+void ctdb_request_message(struct ctdb_context *ctdb,
+ struct ctdb_req_header *hdr)
+{
+ struct ctdb_req_message *c = (struct ctdb_req_message *)hdr;
+ TDB_DATA data;
+
+ data.dsize = c->datalen;
+ data.dptr = talloc_memdup(c, &c->data[0], c->datalen);
+ if (data.dptr == NULL) {
+ DEBUG(DEBUG_ERR, (__location__ " Memory allocation failure\n"));
+ return;
+ }
+
+ srvid_dispatch(ctdb->srv, c->srvid, CTDB_SRVID_ALL, data);
+}
+
static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
/*
handler function in the client
*/
int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
- ctdb_msg_fn_t handler,
+ srvid_handler_fn handler,
void *private_data)
{
int res;
}
/* also need to register the handler with our own ctdb structure */
- return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
+ return srvid_register(ctdb->srv, ctdb, srvid, handler, private_data);
}
/*
}
/* also need to register the handler with our own ctdb structure */
- ctdb_deregister_message_handler(ctdb, srvid, private_data);
+ srvid_deregister(ctdb->srv, srvid, private_data);
return 0;
}
/*
called on each key during a ctdb_traverse
*/
-static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
+static void traverse_handler(uint64_t srvid, TDB_DATA data, void *p)
{
struct traverse_state *state = (struct traverse_state *)p;
struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
ctdb->lastid = INT_MAX-200;
CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
+ ret = srvid_init(ctdb, &ctdb->srv);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, ("srvid_init failed (%s)\n", strerror(ret)));
+ talloc_free(ctdb);
+ return NULL;
+ }
+
ret = ctdb_set_socketname(ctdb, CTDB_SOCKET);
if (ret != 0) {
DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
+++ /dev/null
-/*
- ctdb_message protocol code
-
- Copyright (C) Andrew Tridgell 2007
- Copyright (C) Amitay Isaacs 2013
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, see <http://www.gnu.org/licenses/>.
-*/
-/*
- see http://wiki.samba.org/index.php/Samba_%26_Clustering for
- protocol design and packet details
-*/
-#include "includes.h"
-#include "tdb.h"
-#include "system/network.h"
-#include "system/filesys.h"
-#include "../include/ctdb_private.h"
-#include "lib/util/dlinklist.h"
-
-static int message_list_db_init(struct ctdb_context *ctdb)
-{
- ctdb->message_list_indexdb = tdb_open("messagedb", 8192,
- TDB_INTERNAL|
- TDB_INCOMPATIBLE_HASH|
- TDB_DISALLOW_NESTING,
- O_RDWR|O_CREAT, 0);
- if (ctdb->message_list_indexdb == NULL) {
- DEBUG(DEBUG_ERR, ("Failed to create message list indexdb\n"));
- return -1;
- }
-
- return 0;
-}
-
-static int message_list_db_add(struct ctdb_context *ctdb, uint64_t srvid,
- struct ctdb_message_list_header *h)
-{
- int ret;
- TDB_DATA key, data;
-
- if (ctdb->message_list_indexdb == NULL) {
- ret = message_list_db_init(ctdb);
- if (ret < 0) {
- return -1;
- }
- }
-
- key.dptr = (uint8_t *)&srvid;
- key.dsize = sizeof(uint64_t);
-
- data.dptr = (uint8_t *)&h;
- data.dsize = sizeof(struct ctdb_message_list_header *);
-
- ret = tdb_store(ctdb->message_list_indexdb, key, data, TDB_INSERT);
- if (ret < 0) {
- DEBUG(DEBUG_ERR, ("Failed to add message list handler (%s)\n",
- tdb_errorstr(ctdb->message_list_indexdb)));
- return -1;
- }
-
- return 0;
-}
-
-static int message_list_db_delete(struct ctdb_context *ctdb, uint64_t srvid)
-{
- int ret;
- TDB_DATA key;
-
- if (ctdb->message_list_indexdb == NULL) {
- return -1;
- }
-
- key.dptr = (uint8_t *)&srvid;
- key.dsize = sizeof(uint64_t);
-
- ret = tdb_delete(ctdb->message_list_indexdb, key);
- if (ret < 0) {
- DEBUG(DEBUG_ERR, ("Failed to delete message list handler (%s)\n",
- tdb_errorstr(ctdb->message_list_indexdb)));
- return -1;
- }
-
- return 0;
-}
-
-static int message_list_db_fetch_parser(TDB_DATA key, TDB_DATA data,
- void *private_data)
-{
- struct ctdb_message_list_header **h =
- (struct ctdb_message_list_header **)private_data;
-
- if (data.dsize != sizeof(struct ctdb_message_list_header *)) {
- return -1;
- }
-
- *h = *(struct ctdb_message_list_header **)data.dptr;
- return 0;
-}
-
-static int message_list_db_fetch(struct ctdb_context *ctdb, uint64_t srvid,
- struct ctdb_message_list_header **h)
-{
- TDB_DATA key;
-
- if (ctdb->message_list_indexdb == NULL) {
- return -1;
- }
-
- key.dptr = (uint8_t *)&srvid;
- key.dsize = sizeof(uint64_t);
-
- return tdb_parse_record(ctdb->message_list_indexdb, key,
- message_list_db_fetch_parser, h);
-}
-
-/*
- this dispatches the messages to the registered ctdb message handler
-*/
-int ctdb_dispatch_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
-{
- struct ctdb_message_list_header *h;
- struct ctdb_message_list *m;
- uint64_t srvid_all = CTDB_SRVID_ALL;
- int ret;
-
- ret = message_list_db_fetch(ctdb, srvid, &h);
- if (ret == 0) {
- for (m=h->m; m; m=m->next) {
- m->message_handler(ctdb, srvid, data, m->message_private);
- }
- }
-
- ret = message_list_db_fetch(ctdb, srvid_all, &h);
- if (ret == 0) {
- for(m=h->m; m; m=m->next) {
- m->message_handler(ctdb, srvid, data, m->message_private);
- }
- }
-
- return 0;
-}
-
-/*
- called when a CTDB_REQ_MESSAGE packet comes in
-*/
-void ctdb_request_message(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
-{
- struct ctdb_req_message *c = (struct ctdb_req_message *)hdr;
- TDB_DATA data;
-
- data.dsize = c->datalen;
- data.dptr = talloc_memdup(c, &c->data[0], c->datalen);
-
- ctdb_dispatch_message(ctdb, c->srvid, data);
-}
-
-/*
- * When header is freed, remove all the srvid handlers
- */
-static int message_header_destructor(struct ctdb_message_list_header *h)
-{
- struct ctdb_message_list *m;
-
- while (h->m != NULL) {
- m = h->m;
- DLIST_REMOVE(h->m, m);
- TALLOC_FREE(m);
- }
-
- message_list_db_delete(h->ctdb, h->srvid);
- DLIST_REMOVE(h->ctdb->message_list_header, h);
-
- return 0;
-}
-
-/*
- when a client goes away, we need to remove its srvid handler from the list
- */
-static int message_handler_destructor(struct ctdb_message_list *m)
-{
- struct ctdb_message_list_header *h = m->h;
-
- DLIST_REMOVE(h->m, m);
- if (h->m == NULL) {
- talloc_free(h);
- }
- return 0;
-}
-
-/*
- setup handler for receipt of ctdb messages from ctdb_send_message()
-*/
-int ctdb_register_message_handler(struct ctdb_context *ctdb,
- TALLOC_CTX *mem_ctx,
- uint64_t srvid,
- ctdb_msg_fn_t handler,
- void *private_data)
-{
- struct ctdb_message_list_header *h;
- struct ctdb_message_list *m;
- int ret;
-
- m = talloc_zero(mem_ctx, struct ctdb_message_list);
- CTDB_NO_MEMORY(ctdb, m);
-
- m->message_handler = handler;
- m->message_private = private_data;
-
- ret = message_list_db_fetch(ctdb, srvid, &h);
- if (ret != 0) {
- /* srvid not registered yet */
- h = talloc_zero(ctdb, struct ctdb_message_list_header);
- CTDB_NO_MEMORY(ctdb, h);
-
- h->ctdb = ctdb;
- h->srvid = srvid;
-
- ret = message_list_db_add(ctdb, srvid, h);
- if (ret < 0) {
- talloc_free(m);
- talloc_free(h);
- return -1;
- }
-
- DLIST_ADD(ctdb->message_list_header, h);
- talloc_set_destructor(h, message_header_destructor);
- }
-
- m->h = h;
- DLIST_ADD(h->m, m);
- talloc_set_destructor(m, message_handler_destructor);
- return 0;
-}
-
-
-/*
- setup handler for receipt of ctdb messages from ctdb_send_message()
-*/
-int ctdb_deregister_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
-{
- struct ctdb_message_list_header *h;
- struct ctdb_message_list *m;
- int ret;
-
- ret = message_list_db_fetch(ctdb, srvid, &h);
- if (ret != 0) {
- return -1;
- }
-
- for (m=h->m; m; m=m->next) {
- if (m->message_private == private_data) {
- talloc_free(m);
- return 0;
- }
- }
-
- return -1;
-}
-
-
-/*
- * check if the given srvid exists
- */
-bool ctdb_check_message_handler(struct ctdb_context *ctdb, uint64_t srvid)
-{
- struct ctdb_message_list_header *h;
- int ret;
-
- ret = message_list_db_fetch(ctdb, srvid, &h);
- if (ret != 0 || h->m == NULL) {
- return false;
- }
-
- return true;
-}
--- /dev/null
+../../common/srvid.h
\ No newline at end of file
#ifndef _CTDB_CLIENT_H
#define _CTDB_CLIENT_H
+
+#include "common/srvid.h"
#include "ctdb_protocol.h"
enum control_state {CTDB_CONTROL_WAIT, CTDB_CONTROL_DONE, CTDB_CONTROL_ERROR, CTDB_CONTROL_TIMEOUT};
typedef void (*ctdb_msg_fn_t)(struct ctdb_context *, uint64_t srvid,
TDB_DATA data, void *);
int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
- ctdb_msg_fn_t handler,
- void *private_data);
+ srvid_handler_fn handler,
+ void *private_data);
int ctdb_client_remove_message_handler(struct ctdb_context *ctdb,
uint64_t srvid, void *private_data);
int ctdb_client_check_message_handlers(struct ctdb_context *ctdb,
int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
TDB_DATA key, TDB_DATA *data);
-int ctdb_register_message_handler(struct ctdb_context *ctdb,
- TALLOC_CTX *mem_ctx,
- uint64_t srvid,
- ctdb_msg_fn_t handler,
- void *private_data);
-
struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id);
void (*node_connected)(struct ctdb_node *);
};
-/* list of message handlers - needs to be changed to a more efficient data
- structure so we can find a message handler given a srvid quickly */
-struct ctdb_message_list_header {
- struct ctdb_message_list_header *next, *prev;
- struct ctdb_context *ctdb;
- uint64_t srvid;
- struct ctdb_message_list *m;
-};
-struct ctdb_message_list {
- struct ctdb_message_list *next, *prev;
- struct ctdb_message_list_header *h;
- ctdb_msg_fn_t message_handler;
- void *message_private;
-};
-
/* additional data required for the daemon mode */
struct ctdb_daemon_data {
int sd;
const struct ctdb_upcalls *upcalls; /* transport upcalls */
void *private_data; /* private to transport */
struct ctdb_db_context *db_list;
- struct ctdb_message_list_header *message_list_header;
- struct tdb_context *message_list_indexdb;
+ struct srvid_context *srv;
struct ctdb_daemon_data daemon;
struct ctdb_statistics statistics;
struct ctdb_statistics statistics_current;
int32_t ctdb_control_traverse_kill(struct ctdb_context *ctdb, TDB_DATA indata,
TDB_DATA *outdata, uint32_t srcnode);
-int ctdb_dispatch_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data);
-bool ctdb_check_message_handler(struct ctdb_context *ctdb, uint64_t srvid);
-
int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid);
-int ctdb_deregister_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data);
int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid);
int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
TDB_DATA *outdata);
#include "lib/util/debug.h"
#include "lib/util/samba_util.h"
+#include "common/srvid.h"
#include "ctdb_client.h"
#include "ctdb_logging.h"
message handler for when we are in daemon mode. This redirects the message
to the right client
*/
-static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
- TDB_DATA data, void *private_data)
+static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data)
{
struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
struct ctdb_req_message *r;
/* construct a message to send to the client containing the data */
len = offsetof(struct ctdb_req_message, data) + data.dsize;
- r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
+ r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
len, struct ctdb_req_message);
- CTDB_NO_MEMORY_VOID(ctdb, r);
+ CTDB_NO_MEMORY_VOID(client->ctdb, r);
talloc_set_name_const(r, "req_message packet");
DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
return -1;
}
- res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
+ res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
+ client);
if (res != 0) {
DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
(unsigned long long)srvid));
DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
return -1;
}
- return ctdb_deregister_message_handler(ctdb, srvid, client);
+ return srvid_deregister(ctdb->srv, srvid, client);
}
int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
return -1;
}
for (i=0; i<num_ids; i++) {
- if (ctdb_check_message_handler(ctdb, ids[i])) {
+ if (srvid_exists(ctdb->srv, ids[i]) == 0) {
results[i/8] |= (1 << (i%8));
}
}
ctdb_set_child_logging(ctdb);
+ if (srvid_init(ctdb, &ctdb->srv) != 0) {
+ DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
+ exit(1);
+ }
+
/* initialize statistics collection */
ctdb_statistics_init(ctdb);
static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private_data)
{
- struct ctdb_local_message *m = talloc_get_type(private_data,
- struct ctdb_local_message);
- int res;
+ struct ctdb_local_message *m = talloc_get_type(
+ private_data, struct ctdb_local_message);
- res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
- if (res != 0) {
- DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
- (unsigned long long)m->srvid));
- }
+ srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
talloc_free(m);
}
/*
handler for vacuum fetch
*/
-static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid,
- TDB_DATA data, void *private_data)
+static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data)
{
- struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+ struct ctdb_recoverd *rec = talloc_get_type(
+ private_data, struct ctdb_recoverd);
+ struct ctdb_context *ctdb = rec->ctdb;
struct ctdb_marshall_buffer *recs;
int ret, i;
TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
/*
* handler for database detach
*/
-static void detach_database_handler(struct ctdb_context *ctdb, uint64_t srvid,
- TDB_DATA data, void *private_data)
+static void detach_database_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data)
{
+ struct ctdb_recoverd *rec = talloc_get_type(
+ private_data, struct ctdb_recoverd);
+ struct ctdb_context *ctdb = rec->ctdb;
uint32_t db_id;
struct ctdb_db_context *ctdb_db;
/*
handler for memory dumps
*/
-static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid,
- TDB_DATA data, void *private_data)
+static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
{
+ struct ctdb_recoverd *rec = talloc_get_type(
+ private_data, struct ctdb_recoverd);
+ struct ctdb_context *ctdb = rec->ctdb;
TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
TDB_DATA *dump;
int ret;
/*
handler for reload_nodes
*/
-static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid,
- TDB_DATA data, void *private_data)
+static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data)
{
- struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+ struct ctdb_recoverd *rec = talloc_get_type(
+ private_data, struct ctdb_recoverd);
DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
do_takeover_run(rec, rec->nodemap, false);
}
-
-static void recd_node_rebalance_handler(struct ctdb_context *ctdb,
- uint64_t srvid,
- TDB_DATA data, void *private_data)
+
+static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data)
{
+ struct ctdb_recoverd *rec = talloc_get_type(
+ private_data, struct ctdb_recoverd);
+ struct ctdb_context *ctdb = rec->ctdb;
uint32_t pnn;
uint32_t *t;
int len;
uint32_t deferred_rebalance;
- struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
if (rec->recmaster != ctdb_get_pnn(ctdb)) {
return;
-static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid,
- TDB_DATA data, void *private_data)
+static void recd_update_ip_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data)
{
- struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+ struct ctdb_recoverd *rec = talloc_get_type(
+ private_data, struct ctdb_recoverd);
struct ctdb_public_ip *ip;
if (rec->recmaster != rec->ctdb->pnn) {
srvid_request_reply(ctdb, (struct srvid_request *)r, result);
}
-static void disable_takeover_runs_handler(struct ctdb_context *ctdb,
- uint64_t srvid, TDB_DATA data,
+static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
- struct ctdb_recoverd *rec = talloc_get_type(private_data,
- struct ctdb_recoverd);
+ struct ctdb_recoverd *rec = talloc_get_type(
+ private_data, struct ctdb_recoverd);
- srvid_disable_and_reply(ctdb, data, rec->takeover_run);
+ srvid_disable_and_reply(rec->ctdb, data, rec->takeover_run);
}
/* Backward compatibility for this SRVID */
-static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid,
- TDB_DATA data, void *private_data)
+static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data)
{
- struct ctdb_recoverd *rec = talloc_get_type(private_data,
- struct ctdb_recoverd);
+ struct ctdb_recoverd *rec = talloc_get_type(
+ private_data, struct ctdb_recoverd);
uint32_t timeout;
if (data.dsize != sizeof(uint32_t)) {
timeout = *((uint32_t *)data.dptr);
- ctdb_op_disable(rec->takeover_run, ctdb->ev, timeout);
+ ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
}
-static void disable_recoveries_handler(struct ctdb_context *ctdb,
- uint64_t srvid, TDB_DATA data,
+static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
- struct ctdb_recoverd *rec = talloc_get_type(private_data,
- struct ctdb_recoverd);
+ struct ctdb_recoverd *rec = talloc_get_type(
+ private_data, struct ctdb_recoverd);
- srvid_disable_and_reply(ctdb, data, rec->recovery);
+ srvid_disable_and_reply(rec->ctdb, data, rec->recovery);
}
/*
handle this later in the monitor_cluster loop so we do not recurse
with other requests to takeover_run()
*/
-static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid,
- TDB_DATA data, void *private_data)
+static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data)
{
struct srvid_request *request;
- struct ctdb_recoverd *rec = talloc_get_type(private_data,
- struct ctdb_recoverd);
+ struct ctdb_recoverd *rec = talloc_get_type(
+ private_data, struct ctdb_recoverd);
if (data.dsize != sizeof(struct srvid_request)) {
DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
request = (struct srvid_request *)data.dptr;
- srvid_request_add(ctdb, &rec->reallocate_requests, request);
+ srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
}
static void process_ipreallocate_requests(struct ctdb_context *ctdb,
/*
handler for recovery master elections
*/
-static void election_handler(struct ctdb_context *ctdb, uint64_t srvid,
- TDB_DATA data, void *private_data)
+static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
{
- struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+ struct ctdb_recoverd *rec = talloc_get_type(
+ private_data, struct ctdb_recoverd);
+ struct ctdb_context *ctdb = rec->ctdb;
int ret;
struct election_message *em = (struct election_message *)data.dptr;
/*
handler for when a node changes its flags
*/
-static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
- TDB_DATA data, void *private_data)
+static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
{
+ struct ctdb_recoverd *rec = talloc_get_type(
+ private_data, struct ctdb_recoverd);
+ struct ctdb_context *ctdb = rec->ctdb;
int ret;
struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
struct ctdb_node_map *nodemap=NULL;
TALLOC_CTX *tmp_ctx;
int i;
- struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
int disabled_flag_changed;
if (data.dsize != sizeof(*c)) {
/*
handler for when we need to push out flag changes ot all other nodes
*/
-static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid,
- TDB_DATA data, void *private_data)
+static void push_flags_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data)
{
+ struct ctdb_recoverd *rec = talloc_get_type(
+ private_data, struct ctdb_recoverd);
+ struct ctdb_context *ctdb = rec->ctdb;
int ret;
struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
struct ctdb_node_map *nodemap=NULL;
cdata.dptr = (uint8_t *)d;
cdata.dsize = d->length;
- ctdb_dispatch_message(state->ctdb, state->srvid, cdata);
+ srvid_dispatch(state->ctdb->srv, state->srvid, 0, cdata);
if (key.dsize == 0 && data.dsize == 0) {
DEBUG(DEBUG_NOTICE, ("Ending traverse on DB %s (id %d), records %d\n",
state->h->ctdb_db->db_name, state->h->reqid,
}
-static int msg_count;
-static int msg_plus, msg_minus;
+struct bench_data {
+ struct ctdb_context *ctdb;
+ struct tevent_context *ev;
+ int msg_count;
+ int msg_plus, msg_minus;
+};
/*
handler for messages in bench_ring()
*/
-static void ring_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
- TDB_DATA data, void *private_data)
+static void ring_message_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data)
{
+ struct bench_data *bdata = talloc_get_type_abort(
+ private_data, struct bench_data);
int incr = *(int *)data.dptr;
- int *count = (int *)private_data;
int dest;
- (*count)++;
- dest = (ctdb_get_pnn(ctdb) + num_nodes + incr) % num_nodes;
- ctdb_client_send_message(ctdb, dest, srvid, data);
+ bdata->msg_count++;
+ dest = (ctdb_get_pnn(bdata->ctdb) + num_nodes + incr) % num_nodes;
+ ctdb_client_send_message(bdata->ctdb, dest, srvid, data);
if (incr == 1) {
- msg_plus++;
+ bdata->msg_plus++;
} else {
- msg_minus++;
+ bdata->msg_minus++;
}
}
ctdb_client_send_message(ctdb, dest, 0, data);
}
-static void each_second(struct event_context *ev, struct timed_event *te,
- struct timeval t, void *private_data)
+static void each_second(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private_data)
{
- struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
+ struct bench_data *bdata = talloc_get_type_abort(
+ private_data, struct bench_data);
/* we kickstart the ring into action by inserting messages from node
with pnn 0.
running in which case the ring is broken and the messages are lost.
if so, once every second try again to restart the ring
*/
- if (msg_plus == 0) {
+ if (bdata->msg_plus == 0) {
// printf("no messages recevied, try again to kickstart the ring in forward direction...\n");
- send_start_messages(ctdb, 1);
+ send_start_messages(bdata->ctdb, 1);
}
- if (msg_minus == 0) {
+ if (bdata->msg_minus == 0) {
// printf("no messages recevied, try again to kickstart the ring in reverse direction...\n");
- send_start_messages(ctdb, -1);
+ send_start_messages(bdata->ctdb, -1);
}
- event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1, 0), each_second, ctdb);
+ event_add_timed(bdata->ev, bdata, timeval_current_ofs(1, 0),
+ each_second, bdata);
}
static void dummy_event(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private_data)
{
- struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
- event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1, 0), dummy_event, ctdb);
+ struct bench_data *bdata = talloc_get_type_abort(
+ private_data, struct bench_data);
+
+ event_add_timed(bdata->ev, bdata, timeval_current_ofs(1, 0),
+ dummy_event, bdata);
}
/*
benchmark sending messages in a ring around the nodes
*/
-static void bench_ring(struct ctdb_context *ctdb, struct event_context *ev)
+static void bench_ring(struct bench_data *bdata)
{
- int pnn=ctdb_get_pnn(ctdb);
+ int pnn = ctdb_get_pnn(bdata->ctdb);
if (pnn == 0) {
- event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1, 0), each_second, ctdb);
+ event_add_timed(bdata->ev, bdata, timeval_current_ofs(1, 0),
+ each_second, bdata);
} else {
- event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1, 0), dummy_event, ctdb);
+ event_add_timed(bdata->ev, bdata, timeval_current_ofs(1, 0),
+ dummy_event, bdata);
}
start_timer();
while (end_timer() < timelimit) {
- if (pnn == 0 && msg_count % 10000 == 0 && end_timer() > 0) {
- printf("Ring: %.2f msgs/sec (+ve=%d -ve=%d)\r",
- msg_count/end_timer(), msg_plus, msg_minus);
+ if (pnn == 0 && bdata->msg_count % 10000 == 0 && end_timer() > 0) {
+ printf("Ring: %.2f msgs/sec (+ve=%d -ve=%d)\r",
+ bdata->msg_count/end_timer(),
+ bdata->msg_plus, bdata->msg_minus);
fflush(stdout);
}
- event_loop_once(ev);
+ event_loop_once(bdata->ev);
}
- printf("Ring: %.2f msgs/sec (+ve=%d -ve=%d)\n",
- msg_count/end_timer(), msg_plus, msg_minus);
+ printf("Ring: %.2f msgs/sec (+ve=%d -ve=%d)\n",
+ bdata->msg_count/end_timer(),
+ bdata->msg_plus, bdata->msg_minus);
}
/*
int ret;
poptContext pc;
struct event_context *ev;
+ struct bench_data *bdata;
pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
DEBUG(DEBUG_DEBUG,("ctdb_set_call() failed, ignoring return code %d\n", ret));
}
- if (ctdb_client_set_message_handler(ctdb, 0, ring_message_handler,&msg_count))
+ bdata = talloc_zero(ctdb, struct bench_data);
+ if (bdata == NULL) {
+ goto error;
+ }
+ bdata->ctdb = ctdb;
+ bdata->ev = ev;
+
+ if (ctdb_client_set_message_handler(ctdb, 0, ring_message_handler, bdata))
goto error;
printf("Waiting for cluster\n");
event_loop_once(ev);
}
- bench_ring(ctdb, ev);
-
+ bench_ring(bdata);
+
error:
return 0;
}
static int timelimit = 10;
static int num_records = 10;
static int num_nodes;
-static int msg_count;
+
+struct bench_data {
+ struct ctdb_context *ctdb;
+ struct tevent_context *ev;
+ int msg_count;
+};
#define TESTKEY "testkey"
store a expanded record
send a message to next node to tell it to do the same
*/
-static void bench_fetch_1node(struct ctdb_context *ctdb)
+static void bench_fetch_1node(struct bench_data *bdata)
{
+ struct ctdb_context *ctdb = bdata->ctdb;
TDB_DATA key, data, nulldata;
struct ctdb_db_context *ctdb_db;
TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
}
data.dptr = (uint8_t *)talloc_asprintf_append((char *)data.dptr,
"msg_count=%d on node %d\n",
- msg_count, ctdb_get_pnn(ctdb));
+ bdata->msg_count,
+ ctdb_get_pnn(ctdb));
if (data.dptr == NULL) {
printf("Failed to create record\n");
talloc_free(tmp_ctx);
/*
handler for messages in bench_ring()
*/
-static void message_handler(struct ctdb_context *ctdb, uint64_t srvid,
- TDB_DATA data, void *private_data)
+static void message_handler(uint64_t srvid, TDB_DATA data, void *private_data)
{
- msg_count++;
- bench_fetch_1node(ctdb);
+ struct bench_data *bdata = talloc_get_type_abort(
+ private_data, struct bench_data);
+
+ bdata->msg_count++;
+ bench_fetch_1node(bdata);
}
send a message to next node to tell it to do the same
*/
-static void bench_fetch(struct ctdb_context *ctdb, struct event_context *ev)
+static void bench_fetch(struct bench_data *bdata)
{
+ struct ctdb_context *ctdb = bdata->ctdb;
int pnn=ctdb_get_pnn(ctdb);
if (pnn == num_nodes - 1) {
- bench_fetch_1node(ctdb);
+ bench_fetch_1node(bdata);
}
-
+
start_timer();
- event_add_timed(ev, ctdb, timeval_current_ofs(timelimit,0), timeout_handler, NULL);
+ event_add_timed(bdata->ev, bdata, timeval_current_ofs(timelimit,0),
+ timeout_handler, NULL);
while (end_timer() < timelimit) {
- if (pnn == 0 && msg_count % 100 == 0 && end_timer() > 0) {
- printf("Fetch: %.2f msgs/sec\r", msg_count/end_timer());
+ if (pnn == 0 && bdata->msg_count % 100 == 0 && end_timer() > 0) {
+ printf("Fetch: %.2f msgs/sec\r", bdata->msg_count/end_timer());
fflush(stdout);
}
- if (event_loop_once(ev) != 0) {
+ if (event_loop_once(bdata->ev) != 0) {
printf("Event loop failed!\n");
break;
}
}
- printf("Fetch: %.2f msgs/sec\n", msg_count/end_timer());
+ printf("Fetch: %.2f msgs/sec\n", bdata->msg_count/end_timer());
}
/*
handler for reconfigure message
*/
-static void reconfigure_handler(struct ctdb_context *ctdb, uint64_t srvid,
- TDB_DATA data, void *private_data)
+static void reconfigure_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data)
{
int *ready = (int *)private_data;
*ready = 1;
TDB_DATA key, data;
struct ctdb_record_handle *h;
int cluster_ready=0;
+ struct bench_data *bdata;
pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
exit(1);
}
- ctdb_client_set_message_handler(ctdb, 0, message_handler, &msg_count);
+ bdata = talloc_zero(ctdb, struct bench_data);
+ if (bdata == NULL) {
+ printf("memory allocation error\n");
+ exit(1);
+ }
+
+ bdata->ctdb = ctdb;
+ bdata->ev = ev;
+
+ ctdb_client_set_message_handler(ctdb, 0, message_handler, bdata);
printf("Waiting for cluster\n");
while (1) {
*/
printf("Sleeping for %d seconds\n", num_nodes);
sleep(num_nodes);
- bench_fetch(ctdb, ev);
+ bench_fetch(bdata);
key.dptr = discard_const(TESTKEY);
key.dsize = strlen(TESTKEY);
uint32_t destnode, uint32_t dbid, uint64_t *seqnum);
int ctdb_client_set_message_handler(struct ctdb_context *ctdb,
uint64_t srvid,
- ctdb_msg_fn_t handler,
+ srvid_handler_fn handler,
void *private_data);
int ctdb_client_remove_message_handler(struct ctdb_context *ctdb,
uint64_t srvid,
#include "common/ctdb_io.c"
#include "common/ctdb_util.c"
#include "common/ctdb_ltdb.c"
-#include "common/ctdb_message.c"
+#include "common/db_hash.c"
+#include "common/srvid.c"
#include "common/rb_tree.c"
#include "common/ctdb_logging.c"
#include "common/ctdb_fork.c"
* the ctdb tool only registers one at a time so keep this simple. */
static struct {
uint64_t srvid;
- ctdb_msg_fn_t message_handler;
+ srvid_handler_fn message_handler;
void *message_private;
} ctdb_message_list_fake = {
.srvid = 0,
int ctdb_client_set_message_handler_stub(struct ctdb_context *ctdb,
uint64_t srvid,
- ctdb_msg_fn_t handler,
+ srvid_handler_fn handler,
void *private_data)
{
ctdb_message_list_fake.srvid = srvid;
reply_data.dsize = sizeof(pnn);
reply_data.dptr = (uint8_t *)&pnn;
ctdb_message_list_fake.message_handler(
- ctdb,
ctdb_message_list_fake.srvid,
reply_data,
ctdb_message_list_fake.message_private);
#include "common/ctdb_io.c"
#include "common/ctdb_util.c"
#include "common/ctdb_ltdb.c"
-#include "common/ctdb_message.c"
+#include "common/db_hash.c"
+#include "common/srvid.c"
#include "common/cmdline.c"
#include "common/rb_tree.c"
#include "common/ctdb_logging.c"
const char *srvid_str;
};
-static void srvid_broadcast_reply_handler(struct ctdb_context *ctdb,
- uint64_t srvid,
- TDB_DATA data,
+static void srvid_broadcast_reply_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct srvid_reply_handler_data *d =
/*
handler for memory dumps
*/
-static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid,
- TDB_DATA data, void *private_data)
+static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
{
sys_write(1, data.dptr, data.dsize);
exit(0);
/*
handler for msglisten
*/
-static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid,
- TDB_DATA data, void *private_data)
+static void msglisten_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data)
{
int i;
bld.SAMBA_SUBSYSTEM('ctdb-common',
source=bld.SUBDIR('common',
'''ctdb_io.c ctdb_util.c ctdb_ltdb.c
- ctdb_message.c cmdline.c rb_tree.c
+ cmdline.c rb_tree.c
ctdb_fork.c'''),
includes='include include/internal common .',
deps='replace popt talloc tevent tdb popt ctdb-system')
source=bld.SUBDIR('client', 'ctdb_client.c'),
includes='include include/internal',
deps='''replace popt talloc tevent tdb
- samba-util tdb-wrap''')
+ samba-util tdb-wrap ctdb-util''')
bld.SAMBA_SUBSYSTEM('ctdb-server',
source='server/ctdbd.c ' +
bld.SAMBA_BINARY('ctdbd',
source='',
deps='''ctdb-server ctdb-client ctdb-common
- ctdb-common-util ctdb-tcp''' +
+ ctdb-common-util ctdb-tcp ctdb-util''' +
ib_deps,
install_path='${SBINDIR}',
manpages='ctdbd.1')
bld.SAMBA_BINARY('ctdb',
source='tools/ctdb.c tools/ctdb_vacuum.c',
- deps='ctdb-client ctdb-common ctdb-common-util',
+ deps='''ctdb-client ctdb-common ctdb-common-util
+ ctdb-util''',
includes='include include/internal',
install_path='${BINDIR}',
manpages='ctdb.1')
bld.SAMBA_BINARY(target,
source=src,
includes='include include/internal',
- deps='ctdb-client ctdb-common ctdb-common-util',
+ deps='''ctdb-client ctdb-common ctdb-common-util
+ ctdb-util''',
install_path='${CTDB_TEST_LIBDIR}')
bld.SAMBA_BINARY('ctdb_takeover_tests',