4 Copyright (C) Andrew Tridgell 2006
6 This library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 2 of the License, or (at your option) any later version.
11 This library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with this library; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 #include "lib/events/events.h"
23 #include "lib/util/dlinklist.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "../include/ctdb_private.h"
32 called when we fail to send a message to a node
34 static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te,
35 struct timeval t, void *private)
37 struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
38 struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
39 struct ctdb_tcp_node);
42 while (tnode->queue) {
43 struct ctdb_tcp_packet *pkt = tnode->queue;
44 DLIST_REMOVE(tnode->queue, pkt);
48 /* start a new connect cycle to try to re-establish the
50 talloc_free(tnode->fde);
53 event_add_timed(node->ctdb->ev, node, timeval_zero(),
54 ctdb_tcp_node_connect, node);
58 called when socket becomes readable
60 void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde,
61 uint16_t flags, void *private)
63 struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
64 struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
65 struct ctdb_tcp_node);
66 if (flags & EVENT_FD_READ) {
67 /* getting a read event on this fd in the current tcp model is
68 always an error, as we have separate read and write
69 sockets. In future we may combine them, but for now it must
70 mean that the socket is dead, so we try to reconnect */
71 talloc_free(tnode->fde);
74 event_add_timed(node->ctdb->ev, node, timeval_zero(),
75 ctdb_tcp_node_connect, node);
79 while (tnode->queue) {
80 struct ctdb_tcp_packet *pkt = tnode->queue;
83 n = write(tnode->fd, pkt->data, pkt->length);
85 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
86 event_add_timed(node->ctdb->ev, node, timeval_zero(),
87 ctdb_tcp_node_dead, node);
88 EVENT_FD_NOT_WRITEABLE(tnode->fde);
93 if (n != pkt->length) {
99 DLIST_REMOVE(tnode->queue, pkt);
103 EVENT_FD_NOT_WRITEABLE(tnode->fde);
108 called when an incoming connection is readable
110 void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
111 uint16_t flags, void *private)
113 struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
116 uint8_t *data, *data_base;
118 if (ioctl(in->fd, FIONREAD, &num_ready) != 0 ||
120 /* we've lost the link from another node. We don't
121 notify the upper layers, as we only want to trigger
122 a full node reorganisation when a send fails - that
123 allows nodes to restart without penalty as long as
124 the network is idle */
129 in->partial.data = talloc_realloc_size(in, in->partial.data,
130 num_ready + in->partial.length);
131 if (in->partial.data == NULL) {
132 /* not much we can do except drop the socket */
137 nread = read(in->fd, in->partial.data+in->partial.length, num_ready);
139 /* the connection must be dead */
144 data = in->partial.data;
145 nread += in->partial.length;
147 in->partial.data = NULL;
148 in->partial.length = 0;
150 if (nread >= 4 && *(uint32_t *)data == nread) {
151 /* most common case - we got a whole packet in one go
152 tell the ctdb layer above that we have a packet */
153 in->ctdb->upcalls->recv_pkt(in->ctdb, data, nread);
159 while (nread >= 4 && *(uint32_t *)data <= nread) {
160 /* we have at least one packet */
163 len = *(uint32_t *)data;
164 d2 = talloc_memdup(in, data, len);
170 in->ctdb->upcalls->recv_pkt(in->ctdb, d2, len);
176 if (nread < 4 || *(uint32_t *)data > nread) {
177 /* we have only part of a packet */
178 if (data_base == data) {
179 in->partial.data = data;
180 in->partial.length = nread;
182 in->partial.data = talloc_memdup(in, data, nread);
183 if (in->partial.data == NULL) {
187 in->partial.length = nread;
188 talloc_free(data_base);
193 talloc_free(data_base);
197 queue a packet for sending
199 int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
201 struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
202 struct ctdb_tcp_node);
203 struct ctdb_tcp_packet *pkt;
206 /* enforce the length and alignment rules from the tcp packet allocator */
207 length2 = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
208 *(uint32_t *)data = length2;
210 if (length2 != length) {
211 memset(data+length, 0, length2-length);
214 /* if the queue is empty then try an immediate write, avoiding
215 queue overhead. This relies on non-blocking sockets */
216 if (tnode->queue == NULL && tnode->fd != -1) {
217 ssize_t n = write(tnode->fd, data, length2);
218 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
219 event_add_timed(node->ctdb->ev, node, timeval_zero(),
220 ctdb_tcp_node_dead, node);
221 /* yes, we report success, as the dead node is
222 handled via a separate event */
229 if (length2 == 0) return 0;
232 pkt = talloc(tnode, struct ctdb_tcp_packet);
233 CTDB_NO_MEMORY(node->ctdb, pkt);
235 pkt->data = talloc_memdup(pkt, data, length2);
236 CTDB_NO_MEMORY(node->ctdb, pkt->data);
238 pkt->length = length2;
240 if (tnode->queue == NULL && tnode->fd != -1) {
241 EVENT_FD_WRITEABLE(tnode->fde);
244 DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *);