4 Copyright (C) Andrew Tridgell 2006
6 This library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 2 of the License, or (at your option) any later version.
11 This library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with this library; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 #include "lib/events/events.h"
23 #include "lib/util/dlinklist.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "../include/ctdb_private.h"
32 called when we fail to send a message to a node
34 static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te,
35 struct timeval t, void *private)
37 struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
38 struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
39 struct ctdb_tcp_node);
41 /* start a new connect cycle to try to re-establish the
43 talloc_free(tnode->fde);
46 event_add_timed(node->ctdb->ev, node, timeval_zero(),
47 ctdb_tcp_node_connect, node);
51 called when socket becomes readable
53 void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde,
54 uint16_t flags, void *private)
56 struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
57 struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
58 struct ctdb_tcp_node);
59 if (flags & EVENT_FD_READ) {
60 /* getting a read event on this fd in the current tcp model is
61 always an error, as we have separate read and write
62 sockets. In future we may combine them, but for now it must
63 mean that the socket is dead, so we try to reconnect */
64 node->ctdb->upcalls->node_dead(node);
65 talloc_free(tnode->fde);
68 event_add_timed(node->ctdb->ev, node, timeval_zero(),
69 ctdb_tcp_node_connect, node);
73 while (tnode->queue) {
74 struct ctdb_tcp_packet *pkt = tnode->queue;
77 n = write(tnode->fd, pkt->data, pkt->length);
79 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
80 event_add_timed(node->ctdb->ev, node, timeval_zero(),
81 ctdb_tcp_node_dead, node);
82 EVENT_FD_NOT_WRITEABLE(tnode->fde);
87 if (n != pkt->length) {
93 DLIST_REMOVE(tnode->queue, pkt);
97 EVENT_FD_NOT_WRITEABLE(tnode->fde);
102 called when an incoming connection is readable
104 void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
105 uint16_t flags, void *private)
107 struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
110 uint8_t *data, *data_base;
112 if (ioctl(in->fd, FIONREAD, &num_ready) != 0 ||
114 /* we've lost the link from another node. We don't
115 notify the upper layers, as we only want to trigger
116 a full node reorganisation when a send fails - that
117 allows nodes to restart without penalty as long as
118 the network is idle */
123 in->partial.data = talloc_realloc_size(in, in->partial.data,
124 num_ready + in->partial.length);
125 if (in->partial.data == NULL) {
126 /* not much we can do except drop the socket */
131 nread = read(in->fd, in->partial.data+in->partial.length, num_ready);
133 /* the connection must be dead */
138 data = in->partial.data;
139 nread += in->partial.length;
141 in->partial.data = NULL;
142 in->partial.length = 0;
144 if (nread >= 4 && *(uint32_t *)data == nread) {
145 /* most common case - we got a whole packet in one go
146 tell the ctdb layer above that we have a packet */
147 in->ctdb->upcalls->recv_pkt(in->ctdb, data, nread);
153 while (nread >= 4 && *(uint32_t *)data <= nread) {
154 /* we have at least one packet */
157 len = *(uint32_t *)data;
158 d2 = talloc_memdup(in, data, len);
164 in->ctdb->upcalls->recv_pkt(in->ctdb, d2, len);
170 /* we have only part of a packet */
171 if (data_base == data) {
172 in->partial.data = data;
173 in->partial.length = nread;
175 in->partial.data = talloc_memdup(in, data, nread);
176 if (in->partial.data == NULL) {
180 in->partial.length = nread;
181 talloc_free(data_base);
186 talloc_free(data_base);
190 queue a packet for sending
192 int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
194 struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
195 struct ctdb_tcp_node);
196 struct ctdb_tcp_packet *pkt;
199 /* enforce the length and alignment rules from the tcp packet allocator */
200 length2 = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
201 *(uint32_t *)data = length2;
203 if (length2 != length) {
204 memset(data+length, 0, length2-length);
207 /* if the queue is empty then try an immediate write, avoiding
208 queue overhead. This relies on non-blocking sockets */
209 if (tnode->queue == NULL && tnode->fd != -1) {
210 ssize_t n = write(tnode->fd, data, length2);
211 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
212 event_add_timed(node->ctdb->ev, node, timeval_zero(),
213 ctdb_tcp_node_dead, node);
214 /* yes, we report success, as the dead node is
215 handled via a separate event */
222 if (length2 == 0) return 0;
225 pkt = talloc(tnode, struct ctdb_tcp_packet);
226 CTDB_NO_MEMORY(node->ctdb, pkt);
228 pkt->data = talloc_memdup(pkt, data, length2);
229 CTDB_NO_MEMORY(node->ctdb, pkt->data);
231 pkt->length = length2;
233 if (tnode->queue == NULL && tnode->fd != -1) {
234 EVENT_FD_WRITEABLE(tnode->fde);
237 DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *);