2 Communication endpoint implementation
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
27 #include "lib/util/blocking.h"
28 #include "lib/util/tevent_unix.h"
31 #include "pkt_write.h"
35 * Communication endpoint around a socket
38 #define SMALL_PKT_SIZE 1024
42 comm_read_handler_fn read_handler;
43 void *read_private_data;
44 comm_dead_handler_fn dead_handler;
45 void *dead_private_data;
46 uint8_t small_pkt[SMALL_PKT_SIZE];
47 struct tevent_req *read_req, *write_req;
48 struct tevent_fd *fde;
49 struct tevent_queue *queue;
52 static void comm_fd_handler(struct tevent_context *ev,
53 struct tevent_fd *fde,
54 uint16_t flags, void *private_data);
55 static struct tevent_req *comm_read_send(TALLOC_CTX *mem_ctx,
56 struct tevent_context *ev,
57 struct comm_context *comm,
58 uint8_t *buf, size_t buflen);
59 static void comm_read_failed(struct tevent_req *req);
62 int comm_setup(TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd,
63 comm_read_handler_fn read_handler, void *read_private_data,
64 comm_dead_handler_fn dead_handler, void *dead_private_data,
65 struct comm_context **result)
67 struct comm_context *comm;
74 if (dead_handler == NULL) {
78 /* Socket queue relies on non-blocking sockets. */
79 ret = set_blocking(fd, false);
84 comm = talloc_zero(mem_ctx, struct comm_context);
90 comm->read_handler = read_handler;
91 comm->read_private_data = read_private_data;
92 comm->dead_handler = dead_handler;
93 comm->dead_private_data = dead_private_data;
95 comm->queue = tevent_queue_create(comm, "comm write queue");
96 if (comm->queue == NULL) {
100 /* Set up to write packets */
101 comm->fde = tevent_add_fd(ev, comm, fd, TEVENT_FD_READ,
102 comm_fd_handler, comm);
103 if (comm->fde == NULL) {
107 /* Set up to read packets */
108 if (read_handler != NULL) {
109 struct tevent_req *req;
111 req = comm_read_send(comm, ev, comm, comm->small_pkt,
117 tevent_req_set_callback(req, comm_read_failed, comm);
118 comm->read_req = req;
134 struct comm_read_state {
135 struct tevent_context *ev;
136 struct comm_context *comm;
139 struct tevent_req *subreq;
142 static ssize_t comm_read_more(uint8_t *buf, size_t buflen, void *private_data);
143 static void comm_read_done(struct tevent_req *subreq);
145 static struct tevent_req *comm_read_send(TALLOC_CTX *mem_ctx,
146 struct tevent_context *ev,
147 struct comm_context *comm,
148 uint8_t *buf, size_t buflen)
150 struct tevent_req *req, *subreq;
151 struct comm_read_state *state;
153 req = tevent_req_create(mem_ctx, &state, struct comm_read_state);
161 state->buflen = buflen;
163 subreq = pkt_read_send(state, state->ev, comm->fd, sizeof(uint32_t),
164 state->buf, state->buflen,
165 comm_read_more, NULL);
166 if (tevent_req_nomem(subreq, req)) {
167 return tevent_req_post(req, ev);
169 state->subreq = subreq;
171 tevent_req_set_callback(subreq, comm_read_done, req);
175 static ssize_t comm_read_more(uint8_t *buf, size_t buflen, void *private_data)
179 if (buflen < sizeof(uint32_t)) {
180 return sizeof(uint32_t) - buflen;
183 packet_len = *(uint32_t *)buf;
185 return packet_len - buflen;
188 static void comm_read_done(struct tevent_req *subreq)
190 struct tevent_req *req = tevent_req_callback_data(
191 subreq, struct tevent_req);
192 struct comm_read_state *state = tevent_req_data(
193 req, struct comm_read_state);
194 struct comm_context *comm = state->comm;
200 nread = pkt_read_recv(subreq, state, &buf, &free_buf, &err);
202 state->subreq = NULL;
204 tevent_req_error(req, err);
208 comm->read_handler(buf, nread, comm->read_private_data);
214 subreq = pkt_read_send(state, state->ev, comm->fd, sizeof(uint32_t),
215 state->buf, state->buflen,
216 comm_read_more, NULL);
217 if (tevent_req_nomem(subreq, req)) {
220 state->subreq = subreq;
222 tevent_req_set_callback(subreq, comm_read_done, req);
225 static void comm_read_recv(struct tevent_req *req, int *perr)
229 if (tevent_req_is_unix_error(req, &err)) {
236 static void comm_read_failed(struct tevent_req *req)
238 struct comm_context *comm = tevent_req_callback_data(
239 req, struct comm_context);
241 comm_read_recv(req, NULL);
243 comm->read_req = NULL;
244 if (comm->dead_handler != NULL) {
245 comm->dead_handler(comm->dead_private_data);
254 struct comm_write_state {
255 struct tevent_context *ev;
256 struct comm_context *comm;
257 struct tevent_req *subreq;
259 size_t buflen, nwritten;
262 static void comm_write_trigger(struct tevent_req *req, void *private_data);
263 static void comm_write_done(struct tevent_req *subreq);
265 struct tevent_req *comm_write_send(TALLOC_CTX *mem_ctx,
266 struct tevent_context *ev,
267 struct comm_context *comm,
268 uint8_t *buf, size_t buflen)
270 struct tevent_req *req;
271 struct comm_write_state *state;
273 req = tevent_req_create(mem_ctx, &state, struct comm_write_state);
281 state->buflen = buflen;
283 if (!tevent_queue_add_entry(comm->queue, ev, req,
284 comm_write_trigger, NULL)) {
292 static void comm_write_trigger(struct tevent_req *req, void *private_data)
294 struct comm_write_state *state = tevent_req_data(
295 req, struct comm_write_state);
296 struct comm_context *comm = state->comm;
297 struct tevent_req *subreq;
299 comm->write_req = req;
301 subreq = pkt_write_send(state, state->ev, comm->fd,
302 state->buf, state->buflen);
303 if (tevent_req_nomem(subreq, req)) {
307 state->subreq = subreq;
308 tevent_req_set_callback(subreq, comm_write_done, req);
309 TEVENT_FD_WRITEABLE(comm->fde);
312 static void comm_write_done(struct tevent_req *subreq)
314 struct tevent_req *req = tevent_req_callback_data(
315 subreq, struct tevent_req);
316 struct comm_write_state *state = tevent_req_data(
317 req, struct comm_write_state);
318 struct comm_context *comm = state->comm;
322 TEVENT_FD_NOT_WRITEABLE(comm->fde);
323 nwritten = pkt_write_recv(subreq, &err);
325 state->subreq = NULL;
326 comm->write_req = NULL;
327 if (nwritten == -1) {
329 comm->dead_handler(comm->dead_private_data);
331 tevent_req_error(req, err);
335 state->nwritten = nwritten;
336 tevent_req_done(req);
339 bool comm_write_recv(struct tevent_req *req, int *perr)
341 struct comm_write_state *state = tevent_req_data(
342 req, struct comm_write_state);
345 if (tevent_req_is_unix_error(req, &err)) {
352 if (state->nwritten != state->buflen) {
361 static void comm_fd_handler(struct tevent_context *ev,
362 struct tevent_fd *fde,
363 uint16_t flags, void *private_data)
365 struct comm_context *comm = talloc_get_type_abort(
366 private_data, struct comm_context);
368 if (flags & TEVENT_FD_READ) {
369 struct comm_read_state *read_state;
371 if (comm->read_req == NULL) {
372 /* This should never happen */
376 read_state = tevent_req_data(comm->read_req,
377 struct comm_read_state);
378 pkt_read_handler(ev, fde, flags, read_state->subreq);
381 if (flags & TEVENT_FD_WRITE) {
382 struct comm_write_state *write_state;
384 if (comm->write_req == NULL) {
385 /* This should never happen */
389 write_state = tevent_req_data(comm->write_req,
390 struct comm_write_state);
391 pkt_write_handler(ev, fde, flags, write_state->subreq);