4 Copyright (C) Andrew Tridgell 2006
6 This library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 2 of the License, or (at your option) any later version.
11 This library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with this library; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 #include "lib/events/events.h"
23 #include "lib/util/dlinklist.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "../include/ctdb_private.h"
32 called when we fail to send a message to a node
34 static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te,
35 struct timeval t, void *private)
37 struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
38 struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
39 struct ctdb_tcp_node);
41 /* start a new connect cycle to try to re-establish the
43 talloc_free(tnode->fde);
46 event_add_timed(node->ctdb->ev, node, timeval_zero(),
47 ctdb_tcp_node_connect, node);
51 called when socket becomes readable
53 void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde,
54 uint16_t flags, void *private)
56 struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
57 struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
58 struct ctdb_tcp_node);
59 if (flags & EVENT_FD_READ) {
60 /* getting a read event on this fd in the current tcp model is
61 always an error, as we have separate read and write
62 sockets. In future we may combine them, but for now it must
63 mean that the socket is dead, so we try to reconnect */
64 node->ctdb->upcalls->node_dead(node);
65 talloc_free(tnode->fde);
68 event_add_timed(node->ctdb->ev, node, timeval_zero(),
69 ctdb_tcp_node_connect, node);
73 while (tnode->queue) {
74 struct ctdb_tcp_packet *pkt = tnode->queue;
77 n = write(tnode->fd, pkt->data, pkt->length);
79 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
80 event_add_timed(node->ctdb->ev, node, timeval_zero(),
81 ctdb_tcp_node_dead, node);
82 EVENT_FD_NOT_WRITEABLE(tnode->fde);
87 if (n != pkt->length) {
93 DLIST_REMOVE(tnode->queue, pkt);
97 EVENT_FD_NOT_WRITEABLE(tnode->fde);
102 static void tcp_read_cb(uint8_t *data, int cnt, void *args)
104 struct ctdb_incoming *in = talloc_get_type(args, struct ctdb_incoming);
105 struct ctdb_req_header *hdr;
107 if (cnt < sizeof(*hdr)) {
108 ctdb_set_error(in->ctdb, "Bad packet length %d\n", cnt);
111 hdr = (struct ctdb_req_header *)data;
112 if (cnt != hdr->length) {
113 ctdb_set_error(in->ctdb, "Bad header length %d expected %d\n",
118 if (hdr->ctdb_magic != CTDB_MAGIC) {
119 ctdb_set_error(in->ctdb, "Non CTDB packet rejected\n");
123 if (hdr->ctdb_version != CTDB_VERSION) {
124 ctdb_set_error(in->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
128 /* most common case - we got a whole packet in one go
129 tell the ctdb layer above that we have a packet */
130 in->ctdb->upcalls->recv_pkt(in->ctdb, data, cnt);
134 called when an incoming connection is readable
136 void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
137 uint16_t flags, void *private)
139 struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
141 ctdb_read_pdu(in->fd, in, &in->partial, tcp_read_cb, in);
145 queue a packet for sending
147 int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
149 struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
150 struct ctdb_tcp_node);
151 struct ctdb_tcp_packet *pkt;
154 /* enforce the length and alignment rules from the tcp packet allocator */
155 length2 = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
156 *(uint32_t *)data = length2;
158 if (length2 != length) {
159 memset(data+length, 0, length2-length);
162 /* if the queue is empty then try an immediate write, avoiding
163 queue overhead. This relies on non-blocking sockets */
164 if (tnode->queue == NULL && tnode->fd != -1) {
165 ssize_t n = write(tnode->fd, data, length2);
166 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
167 event_add_timed(node->ctdb->ev, node, timeval_zero(),
168 ctdb_tcp_node_dead, node);
169 /* yes, we report success, as the dead node is
170 handled via a separate event */
177 if (length2 == 0) return 0;
180 pkt = talloc(tnode, struct ctdb_tcp_packet);
181 CTDB_NO_MEMORY(node->ctdb, pkt);
183 pkt->data = talloc_memdup(pkt, data, length2);
184 CTDB_NO_MEMORY(node->ctdb, pkt->data);
186 pkt->length = length2;
188 if (tnode->queue == NULL && tnode->fd != -1) {
189 EVENT_FD_WRITEABLE(tnode->fde);
192 DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *);