4 Copyright (C) Andrew Tridgell 2006
6 This library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 2 of the License, or (at your option) any later version.
11 This library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with this library; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 #include "lib/events/events.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "ctdb_private.h"
29 called when we fail to send a message to a node
31 static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te,
32 struct timeval t, void *private)
34 struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
35 struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
36 struct ctdb_tcp_node);
39 while (tnode->queue) {
40 struct ctdb_tcp_packet *pkt = tnode->queue;
41 DLIST_REMOVE(tnode->queue, pkt);
45 /* start a new connect cycle to try to re-establish the
47 talloc_free(tnode->fde);
50 event_add_timed(node->ctdb->ev, node, timeval_zero(),
51 ctdb_tcp_node_connect, node);
55 called when socket becomes readable
57 void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde,
58 uint16_t flags, void *private)
60 struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
61 struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
62 struct ctdb_tcp_node);
63 if (flags & EVENT_FD_READ) {
64 /* getting a read event on this fd in the current tcp model is
65 always an error, as we have separate read and write
66 sockets. In future we may combine them, but for now it must
67 mean that the socket is dead, so we try to reconnect */
68 talloc_free(tnode->fde);
71 event_add_timed(node->ctdb->ev, node, timeval_zero(),
72 ctdb_tcp_node_connect, node);
76 while (tnode->queue) {
77 struct ctdb_tcp_packet *pkt = tnode->queue;
80 n = write(tnode->fd, pkt->data, pkt->length);
82 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
83 event_add_timed(node->ctdb->ev, node, timeval_zero(),
84 ctdb_tcp_node_dead, node);
85 EVENT_FD_NOT_WRITEABLE(tnode->fde);
90 if (n != pkt->length) {
96 DLIST_REMOVE(tnode->queue, pkt);
100 EVENT_FD_NOT_WRITEABLE(tnode->fde);
105 called when an incoming connection is readable
107 void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
108 uint16_t flags, void *private)
110 struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
114 /* NOTE: we don't yet handle combined packets or partial
115 packets. Obviously that needed fixing, using a similar
116 scheme to the Samba4 packet layer */
118 if (ioctl(in->fd, FIONREAD, &num_ready) != 0 ||
120 /* we've lost the link from another node. We don't
121 notify the upper layers, as we only want to trigger
122 a full node reorganisation when a send fails - that
123 allows nodes to restart without penalty as long as
124 the network is idle */
129 data = talloc_size(in, num_ready);
131 /* not much we can do except drop the socket */
136 if (read(in->fd, data, num_ready) != num_ready) {
141 /* tell the ctdb layer above that we have a packet */
142 in->ctdb->upcalls->recv_pkt(in->ctdb, data, num_ready);
146 queue a packet for sending
148 int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
150 struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
151 struct ctdb_tcp_node);
152 struct ctdb_tcp_packet *pkt;
154 if (tnode->fd == -1) {
155 ctdb_set_error(node->ctdb, "Sending to dead node %s\n", node->name);
159 /* if the queue is empty then try an immediate write, avoiding
160 queue overhead. This relies on non-blocking sockets */
161 if (tnode->queue == NULL) {
162 ssize_t n = write(tnode->fd, data, length);
163 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
164 event_add_timed(node->ctdb->ev, node, timeval_zero(),
165 ctdb_tcp_node_dead, node);
166 /* yes, we report success, as the dead node is
167 handled via a separate event */
174 if (length == 0) return 0;
177 pkt = talloc(tnode, struct ctdb_tcp_packet);
178 CTDB_NO_MEMORY(node->ctdb, pkt);
180 pkt->data = talloc_memdup(pkt, data, length);
181 CTDB_NO_MEMORY(node->ctdb, pkt->data);
183 pkt->length = length;
185 if (tnode->queue == NULL) {
186 EVENT_FD_WRITEABLE(tnode->fde);
189 DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *);