#include "../include/ctdb_private.h"
#include "ctdb_tcp.h"
+/*
+ stop any connecting (established or pending) to a node
+ */
+void ctdb_tcp_stop_connection(struct ctdb_node *node)
+{
+ struct ctdb_tcp_node *tnode = talloc_get_type(
+ node->private_data, struct ctdb_tcp_node);
+
+ ctdb_queue_set_fd(tnode->out_queue, -1);
+ talloc_free(tnode->connect_te);
+ talloc_free(tnode->connect_fde);
+ tnode->connect_fde = NULL;
+ tnode->connect_te = NULL;
+ if (tnode->fd != -1) {
+ close(tnode->fd);
+ tnode->fd = -1;
+ }
+}
+
+
/*
called when a complete packet has come in - should not happen on this socket
*/
node->ctdb->upcalls->node_dead(node);
}
- /* start a new connect cycle to try to re-establish the
- link */
- ctdb_queue_set_fd(tnode->out_queue, -1);
- tnode->fd = -1;
- event_add_timed(node->ctdb->ev, tnode, timeval_zero(),
- ctdb_tcp_node_connect, node);
+ ctdb_tcp_stop_connection(node);
+ tnode->connect_te = event_add_timed(node->ctdb->ev, tnode, timeval_zero(),
+ ctdb_tcp_node_connect, node);
}
/*
if (getsockopt(tnode->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
error != 0) {
- talloc_free(tnode->connect_fde);
- tnode->connect_fde = NULL;
- close(tnode->fd);
- tnode->fd = -1;
- event_add_timed(ctdb->ev, tnode, timeval_current_ofs(1, 0),
- ctdb_tcp_node_connect, node);
+ ctdb_tcp_stop_connection(node);
+ tnode->connect_te = event_add_timed(ctdb->ev, tnode,
+ timeval_current_ofs(1, 0),
+ ctdb_tcp_node_connect, node);
return;
}
talloc_free(tnode->connect_fde);
tnode->connect_fde = NULL;
-
+
setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one));
setsockopt(tnode->fd,SOL_SOCKET,SO_KEEPALIVE,(char *)&one,sizeof(one));
ctdb_queue_set_fd(tnode->out_queue, tnode->fd);
+ /* the queue subsystem now owns this fd */
+ tnode->fd = -1;
+
/* tell the ctdb layer we are connected */
node->ctdb->upcalls->node_connected(node);
}
struct sockaddr_in sock_in;
struct sockaddr_in sock_out;
- if (tnode->fd != -1) {
- talloc_free(tnode->connect_fde);
- tnode->connect_fde = NULL;
- close(tnode->fd);
- tnode->fd = -1;
- }
+ ctdb_tcp_stop_connection(node);
tnode->fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (connect(tnode->fd, (struct sockaddr *)&sock_out, sizeof(sock_out)) != 0 &&
errno != EINPROGRESS) {
- /* try again once a second */
- close(tnode->fd);
- tnode->fd = -1;
- event_add_timed(ctdb->ev, tnode, timeval_current_ofs(1, 0),
- ctdb_tcp_node_connect, node);
+ ctdb_tcp_stop_connection(node);
+ tnode->connect_te = event_add_timed(ctdb->ev, tnode,
+ timeval_current_ofs(1, 0),
+ ctdb_tcp_node_connect, node);
return;
}
struct ctdb_tcp_node *tnode = talloc_get_type(
node->private_data, struct ctdb_tcp_node);
if (!ctdb_same_address(&ctdb->address, &node->address)) {
- event_add_timed(ctdb->ev, tnode, timeval_zero(),
- ctdb_tcp_node_connect, node);
+ tnode->connect_te = event_add_timed(ctdb->ev, tnode,
+ timeval_zero(),
+ ctdb_tcp_node_connect, node);
}
}
DEBUG(0,("Tearing down connection to dead node :%d\n", node->pnn));
- tnode->fd = -1;
- ctdb_queue_set_fd(tnode->out_queue, -1);
+ ctdb_tcp_stop_connection(node);
- event_add_timed(node->ctdb->ev, tnode, timeval_zero(),
- ctdb_tcp_node_connect, node);
+ tnode->connect_te = event_add_timed(node->ctdb->ev, tnode, timeval_zero(),
+ ctdb_tcp_node_connect, node);
}