- added ctdb_set_flags() call
[obnox/samba/samba-obnox.git] / ctdb / tcp / tcp_io.c
1 /* 
2    ctdb over TCP
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "ctdb_private.h"
26 #include "ctdb_tcp.h"
27
28 /*
29   called when we fail to send a message to a node
30 */
31 static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te, 
32                                struct timeval t, void *private)
33 {
34         struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
35         struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
36                                                       struct ctdb_tcp_node);
37
38         /* flush the queue */
39         while (tnode->queue) {
40                 struct ctdb_tcp_packet *pkt = tnode->queue;
41                 DLIST_REMOVE(tnode->queue, pkt);
42                 talloc_free(pkt);
43         }
44
45         /* start a new connect cycle to try to re-establish the
46            link */
47         talloc_free(tnode->fde);
48         close(tnode->fd);
49         tnode->fd = -1;
50         event_add_timed(node->ctdb->ev, node, timeval_zero(), 
51                         ctdb_tcp_node_connect, node);
52 }
53
54 /*
55   called when socket becomes readable
56 */
57 void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, 
58                          uint16_t flags, void *private)
59 {
60         struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
61         struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
62                                                       struct ctdb_tcp_node);
63         if (flags & EVENT_FD_READ) {
64                 /* getting a read event on this fd in the current tcp model is
65                    always an error, as we have separate read and write
66                    sockets. In future we may combine them, but for now it must
67                    mean that the socket is dead, so we try to reconnect */
68                 talloc_free(tnode->fde);
69                 close(tnode->fd);
70                 tnode->fd = -1;
71                 event_add_timed(node->ctdb->ev, node, timeval_zero(), 
72                                 ctdb_tcp_node_connect, node);
73                 return;
74         }
75
76         while (tnode->queue) {
77                 struct ctdb_tcp_packet *pkt = tnode->queue;
78                 ssize_t n;
79
80                 n = write(tnode->fd, pkt->data, pkt->length);
81
82                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
83                         event_add_timed(node->ctdb->ev, node, timeval_zero(), 
84                                         ctdb_tcp_node_dead, node);
85                         EVENT_FD_NOT_WRITEABLE(tnode->fde);
86                         return;
87                 }
88                 if (n <= 0) return;
89                 
90                 if (n != pkt->length) {
91                         pkt->length -= n;
92                         pkt->data += n;
93                         return;
94                 }
95
96                 DLIST_REMOVE(tnode->queue, pkt);
97                 talloc_free(pkt);
98         }
99
100         EVENT_FD_NOT_WRITEABLE(tnode->fde);
101 }
102
103
104 /*
105   called when an incoming connection is readable
106 */
107 void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, 
108                             uint16_t flags, void *private)
109 {
110         struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
111         int num_ready = 0;
112         uint8_t *data;
113
114         /* NOTE: we don't yet handle combined packets or partial
115            packets. Obviously that needed fixing, using a similar
116            scheme to the Samba4 packet layer */
117
118         if (ioctl(in->fd, FIONREAD, &num_ready) != 0 ||
119             num_ready == 0) {
120                 /* we've lost the link from another node. We don't
121                    notify the upper layers, as we only want to trigger
122                    a full node reorganisation when a send fails - that
123                    allows nodes to restart without penalty as long as
124                    the network is idle */
125                 talloc_free(in);
126                 return;
127         }
128
129         data = talloc_size(in, num_ready);
130         if (data == NULL) {
131                 /* not much we can do except drop the socket */
132                 talloc_free(in);
133                 return;
134         }
135
136         if (read(in->fd, data, num_ready) != num_ready) {
137                 talloc_free(in);
138                 return;
139         }
140
141         /* tell the ctdb layer above that we have a packet */
142         in->ctdb->upcalls->recv_pkt(in->ctdb, data, num_ready);
143 }
144
145 /*
146   queue a packet for sending
147 */
148 int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
149 {
150         struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
151                                                       struct ctdb_tcp_node);
152         struct ctdb_tcp_packet *pkt;
153         
154         if (tnode->fd == -1) {
155                 ctdb_set_error(node->ctdb, "Sending to dead node %s\n", node->name);
156                 return -1;
157         }
158
159         /* if the queue is empty then try an immediate write, avoiding
160            queue overhead. This relies on non-blocking sockets */
161         if (tnode->queue == NULL) {
162                 ssize_t n = write(tnode->fd, data, length);
163                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
164                         event_add_timed(node->ctdb->ev, node, timeval_zero(), 
165                                         ctdb_tcp_node_dead, node);
166                         /* yes, we report success, as the dead node is 
167                            handled via a separate event */
168                         return 0;
169                 }
170                 if (n > 0) {
171                         data += n;
172                         length -= n;
173                 }
174                 if (length == 0) return 0;
175         }
176
177         pkt = talloc(tnode, struct ctdb_tcp_packet);
178         CTDB_NO_MEMORY(node->ctdb, pkt);
179
180         pkt->data = talloc_memdup(pkt, data, length);
181         CTDB_NO_MEMORY(node->ctdb, pkt->data);
182
183         pkt->length = length;
184
185         if (tnode->queue == NULL) {
186                 EVENT_FD_WRITEABLE(tnode->fde);
187         }
188
189         DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *);
190
191         return 0;
192 }