2 Generic Unix-domain Socket I/O
4 Copyright (C) Amitay Isaacs 2016
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/filesys.h"
22 #include "system/network.h"
27 #include "lib/util/sys_rw.h"
28 #include "lib/util/debug.h"
29 #include "lib/util/blocking.h"
31 #include "common/logging.h"
32 #include "common/sock_io.h"
34 bool sock_clean(const char *sockpath)
38 ret = unlink(sockpath);
40 D_WARNING("Removed stale socket %s\n", sockpath);
41 } else if (errno != ENOENT) {
42 D_ERR("Failed to remove stale socket %s\n", sockpath);
49 int sock_connect(const char *sockpath)
51 struct sockaddr_un addr;
55 if (sockpath == NULL) {
56 D_ERR("Invalid socket path\n");
60 memset(&addr, 0, sizeof(addr));
61 addr.sun_family = AF_UNIX;
62 len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
63 if (len >= sizeof(addr.sun_path)) {
64 D_ERR("Socket path too long, len=%zu\n", strlen(sockpath));
68 fd = socket(AF_UNIX, SOCK_STREAM, 0);
70 D_ERR("socket() failed, errno=%d\n", errno);
74 ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
76 D_ERR("connect() failed, errno=%d\n", errno);
85 struct tevent_context *ev;
86 sock_queue_callback_fn_t callback;
90 struct tevent_immediate *im;
91 struct tevent_queue *queue;
92 struct tevent_fd *fde;
94 size_t buflen, begin, end;
97 static bool sock_queue_set_fd(struct sock_queue *queue, int fd);
98 static void sock_queue_handler(struct tevent_context *ev,
99 struct tevent_fd *fde, uint16_t flags,
101 static void sock_queue_process(struct sock_queue *queue);
102 static void sock_queue_process_event(struct tevent_context *ev,
103 struct tevent_immediate *im,
106 struct sock_queue *sock_queue_setup(TALLOC_CTX *mem_ctx,
107 struct tevent_context *ev,
109 sock_queue_callback_fn_t callback,
112 struct sock_queue *queue;
114 queue = talloc_zero(mem_ctx, struct sock_queue);
120 queue->callback = callback;
121 queue->private_data = private_data;
123 queue->im = tevent_create_immediate(queue);
124 if (queue->im == NULL) {
129 queue->queue = tevent_queue_create(queue, "out-queue");
130 if (queue->queue == NULL) {
135 if (! sock_queue_set_fd(queue, fd)) {
143 static bool sock_queue_set_fd(struct sock_queue *queue, int fd)
145 TALLOC_FREE(queue->fde);
151 ret = set_blocking(fd, false);
156 queue->fde = tevent_add_fd(queue->ev, queue, fd,
158 sock_queue_handler, queue);
159 if (queue->fde == NULL) {
162 tevent_fd_set_auto_close(queue->fde);
168 static void sock_queue_handler(struct tevent_context *ev,
169 struct tevent_fd *fde, uint16_t flags,
172 struct sock_queue *queue = talloc_get_type_abort(
173 private_data, struct sock_queue);
177 ret = ioctl(queue->fd, FIONREAD, &num_ready);
183 if (num_ready == 0) {
184 /* descriptor has been closed */
188 if (num_ready > queue->buflen - queue->end) {
189 queue->buf = talloc_realloc_size(queue, queue->buf,
190 queue->end + num_ready);
191 if (queue->buf == NULL) {
194 queue->buflen = queue->end + num_ready;
197 nread = sys_read(queue->fd, queue->buf + queue->end, num_ready);
203 sock_queue_process(queue);
207 queue->callback(NULL, 0, queue->private_data);
210 static void sock_queue_process(struct sock_queue *queue)
214 if ((queue->end - queue->begin) < sizeof(uint32_t)) {
215 /* not enough data */
219 pkt_size = *(uint32_t *)(queue->buf + queue->begin);
221 D_ERR("Invalid packet of length 0\n");
222 queue->callback(NULL, 0, queue->private_data);
226 if ((queue->end - queue->begin) < pkt_size) {
227 /* not enough data */
231 queue->callback(queue->buf + queue->begin, pkt_size,
232 queue->private_data);
233 queue->begin += pkt_size;
235 if (queue->begin < queue->end) {
236 /* more data to be processed */
237 tevent_schedule_immediate(queue->im, queue->ev,
238 sock_queue_process_event, queue);
240 TALLOC_FREE(queue->buf);
247 static void sock_queue_process_event(struct tevent_context *ev,
248 struct tevent_immediate *im,
251 struct sock_queue *queue = talloc_get_type_abort(
252 private_data, struct sock_queue);
254 sock_queue_process(queue);
257 struct sock_queue_write_state {
262 static void sock_queue_trigger(struct tevent_req *req, void *private_data);
264 int sock_queue_write(struct sock_queue *queue, uint8_t *buf, size_t buflen)
266 struct tevent_req *req;
267 struct sock_queue_write_state *state;
270 if (buflen >= INT32_MAX) {
274 req = tevent_req_create(queue, &state, struct sock_queue_write_state);
280 state->pkt_size = (uint32_t)buflen;
282 status = tevent_queue_add_entry(queue->queue, queue->ev, req,
283 sock_queue_trigger, queue);
292 static void sock_queue_trigger(struct tevent_req *req, void *private_data)
294 struct sock_queue *queue = talloc_get_type_abort(
295 private_data, struct sock_queue);
296 struct sock_queue_write_state *state = tevent_req_data(
297 req, struct sock_queue_write_state);
303 nwritten = sys_write(queue->fd, state->pkt + offset,
304 state->pkt_size - offset);
306 queue->callback(NULL, 0, queue->private_data);
311 } while (offset < state->pkt_size);
313 tevent_req_done(req);