4 Copyright (C) Andrew Tridgell 2006
6 This library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 2 of the License, or (at your option) any later version.
11 This library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with this library; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 #include "lib/events/events.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "ctdb_private.h"
30 called when we fail to send a message to a node
32 static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te,
33 struct timeval t, void *private)
35 struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
36 struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
37 struct ctdb_tcp_node);
40 while (tnode->queue) {
41 struct ctdb_tcp_packet *pkt = tnode->queue;
42 DLIST_REMOVE(tnode->queue, pkt);
46 /* start a new connect cycle to try to re-establish the
48 talloc_free(tnode->fde);
51 event_add_timed(node->ctdb->ev, node, timeval_zero(),
52 ctdb_tcp_node_connect, node);
56 called when socket becomes readable
58 void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde,
59 uint16_t flags, void *private)
61 struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
62 struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
63 struct ctdb_tcp_node);
64 if (flags & EVENT_FD_READ) {
65 /* getting a read event on this fd in the current tcp model is
66 always an error, as we have separate read and write
67 sockets. In future we may combine them, but for now it must
68 mean that the socket is dead, so we try to reconnect */
69 talloc_free(tnode->fde);
72 event_add_timed(node->ctdb->ev, node, timeval_zero(),
73 ctdb_tcp_node_connect, node);
77 while (tnode->queue) {
78 struct ctdb_tcp_packet *pkt = tnode->queue;
81 n = write(tnode->fd, pkt->data, pkt->length);
83 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
84 event_add_timed(node->ctdb->ev, node, timeval_zero(),
85 ctdb_tcp_node_dead, node);
86 EVENT_FD_NOT_WRITEABLE(tnode->fde);
91 if (n != pkt->length) {
97 DLIST_REMOVE(tnode->queue, pkt);
101 EVENT_FD_NOT_WRITEABLE(tnode->fde);
106 called when an incoming connection is readable
108 void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
109 uint16_t flags, void *private)
111 struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
114 uint8_t *data, *data_base;
116 if (ioctl(in->fd, FIONREAD, &num_ready) != 0 ||
118 /* we've lost the link from another node. We don't
119 notify the upper layers, as we only want to trigger
120 a full node reorganisation when a send fails - that
121 allows nodes to restart without penalty as long as
122 the network is idle */
127 in->partial.data = talloc_realloc_size(in, in->partial.data,
128 num_ready + in->partial.length);
129 if (in->partial.data == NULL) {
130 /* not much we can do except drop the socket */
135 nread = read(in->fd, in->partial.data+in->partial.length, num_ready);
137 /* the connection must be dead */
142 data = in->partial.data;
143 nread += in->partial.length;
145 in->partial.data = NULL;
146 in->partial.length = 0;
148 if (nread >= 4 && *(uint32_t *)data == nread) {
149 /* most common case - we got a whole packet in one go
150 tell the ctdb layer above that we have a packet */
151 in->ctdb->upcalls->recv_pkt(in->ctdb, data, nread);
157 while (nread >= 4 && *(uint32_t *)data <= nread) {
158 /* we have at least one packet */
161 len = *(uint32_t *)data;
162 d2 = talloc_memdup(in, data, len);
168 in->ctdb->upcalls->recv_pkt(in->ctdb, d2, len);
174 if (nread < 4 || *(uint32_t *)data > nread) {
175 /* we have only part of a packet */
176 if (data_base == data) {
177 in->partial.data = data;
178 in->partial.length = nread;
180 in->partial.data = talloc_memdup(in, data, nread);
181 if (in->partial.data == NULL) {
185 in->partial.length = nread;
186 talloc_free(data_base);
191 talloc_free(data_base);
195 queue a packet for sending
197 int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
199 struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
200 struct ctdb_tcp_node);
201 struct ctdb_tcp_packet *pkt;
203 /* enforce the length and alignment rules from the tcp packet allocator */
204 length = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
205 *(uint32_t *)data = length;
207 /* if the queue is empty then try an immediate write, avoiding
208 queue overhead. This relies on non-blocking sockets */
209 if (tnode->queue == NULL && tnode->fd != -1) {
210 ssize_t n = write(tnode->fd, data, length);
211 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
212 event_add_timed(node->ctdb->ev, node, timeval_zero(),
213 ctdb_tcp_node_dead, node);
214 /* yes, we report success, as the dead node is
215 handled via a separate event */
222 if (length == 0) return 0;
225 pkt = talloc(tnode, struct ctdb_tcp_packet);
226 CTDB_NO_MEMORY(node->ctdb, pkt);
228 pkt->data = talloc_memdup(pkt, data, length);
229 CTDB_NO_MEMORY(node->ctdb, pkt->data);
231 pkt->length = length;
233 if (tnode->queue == NULL && tnode->fd != -1) {
234 EVENT_FD_WRITEABLE(tnode->fde);
237 DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *);