Merge from tridge
[vlendec/samba-autobuild/.git] / ctdb / tcp / tcp_io.c
1 /* 
2    ctdb over TCP
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "ctdb_private.h"
26 #include "ctdb_tcp.h"
27
28 /*
29   called when we fail to send a message to a node
30 */
31 static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te, 
32                                struct timeval t, void *private)
33 {
34         struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
35         struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
36                                                       struct ctdb_tcp_node);
37
38         /* flush the queue */
39         while (tnode->queue) {
40                 struct ctdb_tcp_packet *pkt = tnode->queue;
41                 DLIST_REMOVE(tnode->queue, pkt);
42                 talloc_free(pkt);
43         }
44
45         /* start a new connect cycle to try to re-establish the
46            link */
47         talloc_free(tnode->fde);
48         close(tnode->fd);
49         tnode->fd = -1;
50         event_add_timed(node->ctdb->ev, node, timeval_zero(), 
51                         ctdb_tcp_node_connect, node);
52 }
53
54 /*
55   called when socket becomes readable
56 */
57 void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, 
58                          uint16_t flags, void *private)
59 {
60         struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
61         struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
62                                                       struct ctdb_tcp_node);
63         if (flags & EVENT_FD_READ) {
64                 /* getting a read event on this fd in the current tcp model is
65                    always an error, as we have separate read and write
66                    sockets. In future we may combine them, but for now it must
67                    mean that the socket is dead, so we try to reconnect */
68                 talloc_free(tnode->fde);
69                 close(tnode->fd);
70                 tnode->fd = -1;
71                 event_add_timed(node->ctdb->ev, node, timeval_zero(), 
72                                 ctdb_tcp_node_connect, node);
73                 return;
74         }
75
76         while (tnode->queue) {
77                 struct ctdb_tcp_packet *pkt = tnode->queue;
78                 ssize_t n;
79
80                 n = write(tnode->fd, pkt->data, pkt->length);
81
82                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
83                         event_add_timed(node->ctdb->ev, node, timeval_zero(), 
84                                         ctdb_tcp_node_dead, node);
85                         EVENT_FD_NOT_WRITEABLE(tnode->fde);
86                         return;
87                 }
88                 if (n <= 0) return;
89                 
90                 if (n != pkt->length) {
91                         pkt->length -= n;
92                         pkt->data += n;
93                         return;
94                 }
95
96                 DLIST_REMOVE(tnode->queue, pkt);
97                 talloc_free(pkt);
98         }
99
100         EVENT_FD_NOT_WRITEABLE(tnode->fde);
101 }
102
103
104 /*
105   called when an incoming connection is readable
106 */
107 void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, 
108                             uint16_t flags, void *private)
109 {
110         struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
111         int num_ready = 0;
112         uint8_t *data;
113
114         /* NOTE: we don't yet handle combined packets or partial
115            packets. Obviously that needed fixing, using a similar
116            scheme to the Samba4 packet layer */
117
118         if (ioctl(in->fd, FIONREAD, &num_ready) != 0 ||
119             num_ready == 0) {
120                 /* we've lost the link from another node. We don't
121                    notify the upper layers, as we only want to trigger
122                    a full node reorganisation when a send fails - that
123                    allows nodes to restart without penalty as long as
124                    the network is idle */
125                 talloc_free(in);
126                 return;
127         }
128
129         data = talloc_size(in, num_ready);
130         if (data == NULL) {
131                 /* not much we can do except drop the socket */
132                 talloc_free(in);
133                 return;
134         }
135
136         if (read(in->fd, data, num_ready) != num_ready) {
137                 talloc_free(in);
138                 return;
139         }
140
141         /* tell the ctdb layer above that we have a packet */
142         in->ctdb->upcalls->recv_pkt(in->ctdb, data, num_ready);
143
144         talloc_free(data);
145 }
146
147 /*
148   queue a packet for sending
149 */
150 int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
151 {
152         struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
153                                                       struct ctdb_tcp_node);
154         struct ctdb_tcp_packet *pkt;
155         
156         if (tnode->fd == -1) {
157                 ctdb_set_error(node->ctdb, "Sending to dead node %s\n", node->name);
158                 return -1;
159         }
160
161         /* if the queue is empty then try an immediate write, avoiding
162            queue overhead. This relies on non-blocking sockets */
163         if (tnode->queue == NULL) {
164                 ssize_t n = write(tnode->fd, data, length);
165                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
166                         event_add_timed(node->ctdb->ev, node, timeval_zero(), 
167                                         ctdb_tcp_node_dead, node);
168                         /* yes, we report success, as the dead node is 
169                            handled via a separate event */
170                         return 0;
171                 }
172                 if (n > 0) {
173                         data += n;
174                         length -= n;
175                 }
176                 if (length == 0) return 0;
177         }
178
179         pkt = talloc(tnode, struct ctdb_tcp_packet);
180         CTDB_NO_MEMORY(node->ctdb, pkt);
181
182         pkt->data = talloc_memdup(pkt, data, length);
183         CTDB_NO_MEMORY(node->ctdb, pkt->data);
184
185         pkt->length = length;
186
187         if (tnode->queue == NULL) {
188                 EVENT_FD_WRITEABLE(tnode->fde);
189         }
190
191         DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *);
192
193         return 0;
194 }