Fix spelling mistakes
[vlendec/samba-autobuild/.git] / ctdb / common / sock_io.c
1 /*
2    Generic Unix-domain Socket I/O
3
4    Copyright (C) Amitay Isaacs  2016
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/network.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26
27 #include "lib/util/sys_rw.h"
28 #include "lib/util/debug.h"
29 #include "lib/util/blocking.h"
30
31 #include "common/logging.h"
32 #include "common/sock_io.h"
33
34 bool sock_clean(const char *sockpath)
35 {
36         int ret;
37
38         ret = unlink(sockpath);
39         if (ret == 0) {
40                 D_WARNING("Removed stale socket %s\n", sockpath);
41         } else if (errno != ENOENT) {
42                 D_ERR("Failed to remove stale socket %s\n", sockpath);
43                 return false;
44         }
45
46         return true;
47 }
48
49 int sock_connect(const char *sockpath)
50 {
51         struct sockaddr_un addr;
52         size_t len;
53         int fd, ret;
54
55         if (sockpath == NULL) {
56                 D_ERR("Invalid socket path\n");
57                 return -1;
58         }
59
60         memset(&addr, 0, sizeof(addr));
61         addr.sun_family = AF_UNIX;
62         len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
63         if (len >= sizeof(addr.sun_path)) {
64                 D_ERR("Socket path too long, len=%zu\n", strlen(sockpath));
65                 return -1;
66         }
67
68         fd = socket(AF_UNIX, SOCK_STREAM, 0);
69         if (fd == -1) {
70                 D_ERR("socket() failed, errno=%d\n", errno);
71                 return -1;
72         }
73
74         ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
75         if (ret == -1) {
76                 D_ERR("connect() failed, errno=%d\n", errno);
77                 close(fd);
78                 return -1;
79         }
80
81         return fd;
82 }
83
84 struct sock_queue {
85         struct tevent_context *ev;
86         sock_queue_callback_fn_t callback;
87         void *private_data;
88         int fd;
89
90         struct tevent_immediate *im;
91         struct tevent_queue *queue;
92         struct tevent_fd *fde;
93         uint8_t *buf;
94         size_t buflen, begin, end;
95 };
96
97 /*
98  * The reserved talloc headers, SOCK_QUEUE_OBJ_COUNT,
99  * and the pre-allocated pool-memory SOCK_QUEUE_POOL_SIZE,
100  * are used for the sub-objects queue->im, queue->queue, queue->fde
101  * and queue->buf.
102  * If the memory allocating sub-objects of struct sock_queue change,
103  * those values need to be adjusted.
104  */
105 #define SOCK_QUEUE_OBJ_COUNT 4
106 #define SOCK_QUEUE_POOL_SIZE 2048
107
108 static bool sock_queue_set_fd(struct sock_queue *queue, int fd);
109 static void sock_queue_handler(struct tevent_context *ev,
110                                struct tevent_fd *fde, uint16_t flags,
111                                void *private_data);
112 static void sock_queue_process(struct sock_queue *queue);
113 static void sock_queue_process_event(struct tevent_context *ev,
114                                      struct tevent_immediate *im,
115                                      void *private_data);
116
117 struct sock_queue *sock_queue_setup(TALLOC_CTX *mem_ctx,
118                                     struct tevent_context *ev,
119                                     int fd,
120                                     sock_queue_callback_fn_t callback,
121                                     void *private_data)
122 {
123         struct sock_queue *queue;
124
125         queue = talloc_pooled_object(mem_ctx, struct sock_queue,
126                                      SOCK_QUEUE_OBJ_COUNT, SOCK_QUEUE_POOL_SIZE);
127         if (queue == NULL) {
128                 return NULL;
129         }
130         memset(queue, 0, sizeof(struct sock_queue));
131
132         queue->ev = ev;
133         queue->callback = callback;
134         queue->private_data = private_data;
135
136         queue->im = tevent_create_immediate(queue);
137         if (queue->im == NULL) {
138                 talloc_free(queue);
139                 return NULL;
140         }
141
142         queue->queue = tevent_queue_create(queue, "out-queue");
143         if (queue->queue == NULL) {
144                 talloc_free(queue);
145                 return NULL;
146         }
147
148         if (! sock_queue_set_fd(queue, fd)) {
149                 talloc_free(queue);
150                 return NULL;
151         }
152
153         return queue;
154 }
155
156 static bool sock_queue_set_fd(struct sock_queue *queue, int fd)
157 {
158         TALLOC_FREE(queue->fde);
159         queue->fd = fd;
160
161         if (fd != -1) {
162                 int ret;
163
164                 ret = set_blocking(fd, false);
165                 if (ret != 0) {
166                         return false;
167                 }
168
169                 queue->fde = tevent_add_fd(queue->ev, queue, fd,
170                                            TEVENT_FD_READ,
171                                            sock_queue_handler, queue);
172                 if (queue->fde == NULL) {
173                         return false;
174                 }
175                 tevent_fd_set_auto_close(queue->fde);
176         }
177
178         return true;
179 }
180
181 static void sock_queue_handler(struct tevent_context *ev,
182                                struct tevent_fd *fde, uint16_t flags,
183                                void *private_data)
184 {
185         struct sock_queue *queue = talloc_get_type_abort(
186                 private_data, struct sock_queue);
187         int ret, num_ready;
188         ssize_t nread;
189
190         ret = ioctl(queue->fd, FIONREAD, &num_ready);
191         if (ret != 0) {
192                 /* Ignore */
193                 return;
194         }
195
196         if (num_ready == 0) {
197                 /* descriptor has been closed */
198                 goto fail;
199         }
200
201         if (num_ready > queue->buflen - queue->end) {
202                 queue->buf = talloc_realloc_size(queue, queue->buf,
203                                                  queue->end + num_ready);
204                 if (queue->buf == NULL) {
205                         goto fail;
206                 }
207                 queue->buflen = queue->end + num_ready;
208         }
209
210         nread = sys_read(queue->fd, queue->buf + queue->end, num_ready);
211         if (nread < 0) {
212                 goto fail;
213         }
214         queue->end += nread;
215
216         sock_queue_process(queue);
217         return;
218
219 fail:
220         queue->callback(NULL, 0, queue->private_data);
221 }
222
223 static void sock_queue_process(struct sock_queue *queue)
224 {
225         uint32_t pkt_size;
226
227         if ((queue->end - queue->begin) < sizeof(uint32_t)) {
228                 /* not enough data */
229                 return;
230         }
231
232         pkt_size = *(uint32_t *)(queue->buf + queue->begin);
233         if (pkt_size == 0) {
234                 D_ERR("Invalid packet of length 0\n");
235                 queue->callback(NULL, 0, queue->private_data);
236                 return;
237         }
238
239         if ((queue->end - queue->begin) < pkt_size) {
240                 /* not enough data */
241                 return;
242         }
243
244         queue->callback(queue->buf + queue->begin, pkt_size,
245                         queue->private_data);
246         queue->begin += pkt_size;
247
248         if (queue->begin < queue->end) {
249                 /* more data to be processed */
250                 tevent_schedule_immediate(queue->im, queue->ev,
251                                           sock_queue_process_event, queue);
252         } else {
253                 TALLOC_FREE(queue->buf);
254                 queue->buflen = 0;
255                 queue->begin = 0;
256                 queue->end = 0;
257         }
258 }
259
260 static void sock_queue_process_event(struct tevent_context *ev,
261                                      struct tevent_immediate *im,
262                                      void *private_data)
263 {
264         struct sock_queue *queue = talloc_get_type_abort(
265                 private_data, struct sock_queue);
266
267         sock_queue_process(queue);
268 }
269
270 struct sock_queue_write_state {
271         uint8_t *pkt;
272         uint32_t pkt_size;
273 };
274
275 static void sock_queue_trigger(struct tevent_req *req, void *private_data);
276
277 int sock_queue_write(struct sock_queue *queue, uint8_t *buf, size_t buflen)
278 {
279         struct tevent_req *req;
280         struct sock_queue_write_state *state;
281         struct tevent_queue_entry *qentry;
282
283         if (buflen >= INT32_MAX) {
284                 return -1;
285         }
286
287         req = tevent_req_create(queue, &state, struct sock_queue_write_state);
288         if (req == NULL) {
289                 return -1;
290         }
291
292         state->pkt = buf;
293         state->pkt_size = (uint32_t)buflen;
294
295         qentry = tevent_queue_add_entry(queue->queue, queue->ev, req,
296                                         sock_queue_trigger, queue);
297         if (qentry == NULL) {
298                 talloc_free(req);
299                 return -1;
300         }
301
302         return 0;
303 }
304
305 static void sock_queue_trigger(struct tevent_req *req, void *private_data)
306 {
307         struct sock_queue *queue = talloc_get_type_abort(
308                 private_data, struct sock_queue);
309         struct sock_queue_write_state *state = tevent_req_data(
310                 req, struct sock_queue_write_state);
311         size_t offset = 0;
312
313         do {
314                 ssize_t nwritten;
315
316                 nwritten = sys_write(queue->fd, state->pkt + offset,
317                                      state->pkt_size - offset);
318                 if (nwritten < 0) {
319                         queue->callback(NULL, 0, queue->private_data);
320                         return;
321                 }
322                 offset += nwritten;
323
324         } while (offset < state->pkt_size);
325
326         tevent_req_done(req);
327         talloc_free(req);
328 }