4 Copyright (C) Andrew Tridgell 2006
6 This library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 2 of the License, or (at your option) any later version.
11 This library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with this library; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 #include "lib/events/events.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "ctdb_private.h"
29 called when we fail to send a message to a node
31 static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te,
32 struct timeval t, void *private)
34 struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
35 struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
36 struct ctdb_tcp_node);
39 while (tnode->queue) {
40 struct ctdb_tcp_packet *pkt = tnode->queue;
41 DLIST_REMOVE(tnode->queue, pkt);
45 /* start a new connect cycle to try to re-establish the
47 talloc_free(tnode->fde);
50 event_add_timed(node->ctdb->ev, node, timeval_zero(),
51 ctdb_tcp_node_connect, node);
55 called when socket becomes readable
57 void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde,
58 uint16_t flags, void *private)
60 struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
61 struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
62 struct ctdb_tcp_node);
63 if (flags & EVENT_FD_READ) {
64 /* getting a read event on this fd in the current tcp model is
65 always an error, as we have separate read and write
66 sockets. In future we may combine them, but for now it must
67 mean that the socket is dead, so we try to reconnect */
68 talloc_free(tnode->fde);
71 event_add_timed(node->ctdb->ev, node, timeval_zero(),
72 ctdb_tcp_node_connect, node);
76 while (tnode->queue) {
77 struct ctdb_tcp_packet *pkt = tnode->queue;
80 n = write(tnode->fd, pkt->data, pkt->length);
82 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
83 event_add_timed(node->ctdb->ev, node, timeval_zero(),
84 ctdb_tcp_node_dead, node);
85 EVENT_FD_NOT_WRITEABLE(tnode->fde);
90 if (n != pkt->length) {
96 DLIST_REMOVE(tnode->queue, pkt);
100 EVENT_FD_NOT_WRITEABLE(tnode->fde);
105 called when an incoming connection is readable
107 void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
108 uint16_t flags, void *private)
110 struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
114 /* NOTE: we don't yet handle combined packets or partial
115 packets. Obviously that needed fixing, using a similar
116 scheme to the Samba4 packet layer */
118 if (ioctl(in->fd, FIONREAD, &num_ready) != 0 ||
120 /* we've lost the link from another node. We don't
121 notify the upper layers, as we only want to trigger
122 a full node reorganisation when a send fails - that
123 allows nodes to restart without penalty as long as
124 the network is idle */
129 data = talloc_size(in, num_ready);
131 /* not much we can do except drop the socket */
136 if (read(in->fd, data, num_ready) != num_ready) {
141 /* tell the ctdb layer above that we have a packet */
142 in->ctdb->upcalls->recv_pkt(in->ctdb, data, num_ready);
148 queue a packet for sending
150 int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
152 struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
153 struct ctdb_tcp_node);
154 struct ctdb_tcp_packet *pkt;
156 if (tnode->fd == -1) {
157 ctdb_set_error(node->ctdb, "Sending to dead node %s\n", node->name);
161 /* if the queue is empty then try an immediate write, avoiding
162 queue overhead. This relies on non-blocking sockets */
163 if (tnode->queue == NULL) {
164 ssize_t n = write(tnode->fd, data, length);
165 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
166 event_add_timed(node->ctdb->ev, node, timeval_zero(),
167 ctdb_tcp_node_dead, node);
168 /* yes, we report success, as the dead node is
169 handled via a separate event */
176 if (length == 0) return 0;
179 pkt = talloc(tnode, struct ctdb_tcp_packet);
180 CTDB_NO_MEMORY(node->ctdb, pkt);
182 pkt->data = talloc_memdup(pkt, data, length);
183 CTDB_NO_MEMORY(node->ctdb, pkt->data);
185 pkt->length = length;
187 if (tnode->queue == NULL) {
188 EVENT_FD_WRITEABLE(tnode->fde);
191 DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *);