ctdb-common: Remove sock_queue_destructor
[kai/samba-autobuild/.git] / ctdb / common / sock_io.c
1 /*
2    Generic Unix-domain Socket I/O
3
4    Copyright (C) Amitay Isaacs  2016
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/network.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26
27 #include "lib/util/sys_rw.h"
28 #include "lib/util/debug.h"
29 #include "lib/util/blocking.h"
30
31 #include "common/logging.h"
32 #include "common/sock_io.h"
33
34 bool sock_clean(const char *sockpath)
35 {
36         int ret;
37
38         ret = unlink(sockpath);
39         if (ret == 0) {
40                 D_WARNING("Removed stale socket %s\n", sockpath);
41         } else if (errno != ENOENT) {
42                 D_ERR("Failed to remove stale socket %s\n", sockpath);
43                 return false;
44         }
45
46         return true;
47 }
48
49 int sock_connect(const char *sockpath)
50 {
51         struct sockaddr_un addr;
52         size_t len;
53         int fd, ret;
54
55         if (sockpath == NULL) {
56                 D_ERR("Invalid socket path\n");
57                 return -1;
58         }
59
60         memset(&addr, 0, sizeof(addr));
61         addr.sun_family = AF_UNIX;
62         len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
63         if (len >= sizeof(addr.sun_path)) {
64                 D_ERR("Socket path too long, len=%zu\n", strlen(sockpath));
65                 return -1;
66         }
67
68         fd = socket(AF_UNIX, SOCK_STREAM, 0);
69         if (fd == -1) {
70                 D_ERR("socket() failed, errno=%d\n", errno);
71                 return -1;
72         }
73
74         ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
75         if (ret == -1) {
76                 D_ERR("connect() failed, errno=%d\n", errno);
77                 close(fd);
78                 return -1;
79         }
80
81         return fd;
82 }
83
84 struct sock_queue {
85         struct tevent_context *ev;
86         sock_queue_callback_fn_t callback;
87         void *private_data;
88         int fd;
89
90         struct tevent_immediate *im;
91         struct tevent_queue *queue;
92         struct tevent_fd *fde;
93         uint8_t *buf;
94         size_t buflen, begin, end;
95 };
96
97 static bool sock_queue_set_fd(struct sock_queue *queue, int fd);
98 static void sock_queue_handler(struct tevent_context *ev,
99                                struct tevent_fd *fde, uint16_t flags,
100                                void *private_data);
101 static void sock_queue_process(struct sock_queue *queue);
102 static void sock_queue_process_event(struct tevent_context *ev,
103                                      struct tevent_immediate *im,
104                                      void *private_data);
105
106 struct sock_queue *sock_queue_setup(TALLOC_CTX *mem_ctx,
107                                     struct tevent_context *ev,
108                                     int fd,
109                                     sock_queue_callback_fn_t callback,
110                                     void *private_data)
111 {
112         struct sock_queue *queue;
113
114         queue = talloc_zero(mem_ctx, struct sock_queue);
115         if (queue == NULL) {
116                 return NULL;
117         }
118
119         queue->ev = ev;
120         queue->callback = callback;
121         queue->private_data = private_data;
122
123         queue->im = tevent_create_immediate(queue);
124         if (queue->im == NULL) {
125                 talloc_free(queue);
126                 return NULL;
127         }
128
129         queue->queue = tevent_queue_create(queue, "out-queue");
130         if (queue->queue == NULL) {
131                 talloc_free(queue);
132                 return NULL;
133         }
134
135         if (! sock_queue_set_fd(queue, fd)) {
136                 talloc_free(queue);
137                 return NULL;
138         }
139
140         return queue;
141 }
142
143 static bool sock_queue_set_fd(struct sock_queue *queue, int fd)
144 {
145         TALLOC_FREE(queue->fde);
146         queue->fd = fd;
147
148         if (fd != -1) {
149                 int ret;
150
151                 ret = set_blocking(fd, false);
152                 if (ret != 0) {
153                         return false;
154                 }
155
156                 queue->fde = tevent_add_fd(queue->ev, queue, fd,
157                                            TEVENT_FD_READ,
158                                            sock_queue_handler, queue);
159                 if (queue->fde == NULL) {
160                         return false;
161                 }
162                 tevent_fd_set_auto_close(queue->fde);
163         }
164
165         return true;
166 }
167
168 static void sock_queue_handler(struct tevent_context *ev,
169                                struct tevent_fd *fde, uint16_t flags,
170                                void *private_data)
171 {
172         struct sock_queue *queue = talloc_get_type_abort(
173                 private_data, struct sock_queue);
174         int ret, num_ready;
175         ssize_t nread;
176
177         ret = ioctl(queue->fd, FIONREAD, &num_ready);
178         if (ret != 0) {
179                 /* Ignore */
180                 return;
181         }
182
183         if (num_ready == 0) {
184                 /* descriptor has been closed */
185                 goto fail;
186         }
187
188         if (num_ready > queue->buflen - queue->end) {
189                 queue->buf = talloc_realloc_size(queue, queue->buf,
190                                                  queue->end + num_ready);
191                 if (queue->buf == NULL) {
192                         goto fail;
193                 }
194                 queue->buflen = queue->end + num_ready;
195         }
196
197         nread = sys_read(queue->fd, queue->buf + queue->end, num_ready);
198         if (nread < 0) {
199                 goto fail;
200         }
201         queue->end += nread;
202
203         sock_queue_process(queue);
204         return;
205
206 fail:
207         queue->callback(NULL, 0, queue->private_data);
208 }
209
210 static void sock_queue_process(struct sock_queue *queue)
211 {
212         uint32_t pkt_size;
213
214         if ((queue->end - queue->begin) < sizeof(uint32_t)) {
215                 /* not enough data */
216                 return;
217         }
218
219         pkt_size = *(uint32_t *)(queue->buf + queue->begin);
220         if (pkt_size == 0) {
221                 D_ERR("Invalid packet of length 0\n");
222                 queue->callback(NULL, 0, queue->private_data);
223                 return;
224         }
225
226         if ((queue->end - queue->begin) < pkt_size) {
227                 /* not enough data */
228                 return;
229         }
230
231         queue->callback(queue->buf + queue->begin, pkt_size,
232                         queue->private_data);
233         queue->begin += pkt_size;
234
235         if (queue->begin < queue->end) {
236                 /* more data to be processed */
237                 tevent_schedule_immediate(queue->im, queue->ev,
238                                           sock_queue_process_event, queue);
239         } else {
240                 TALLOC_FREE(queue->buf);
241                 queue->buflen = 0;
242                 queue->begin = 0;
243                 queue->end = 0;
244         }
245 }
246
247 static void sock_queue_process_event(struct tevent_context *ev,
248                                      struct tevent_immediate *im,
249                                      void *private_data)
250 {
251         struct sock_queue *queue = talloc_get_type_abort(
252                 private_data, struct sock_queue);
253
254         sock_queue_process(queue);
255 }
256
257 struct sock_queue_write_state {
258         uint8_t *pkt;
259         uint32_t pkt_size;
260 };
261
262 static void sock_queue_trigger(struct tevent_req *req, void *private_data);
263
264 int sock_queue_write(struct sock_queue *queue, uint8_t *buf, size_t buflen)
265 {
266         struct tevent_req *req;
267         struct sock_queue_write_state *state;
268         bool status;
269
270         if (buflen >= INT32_MAX) {
271                 return -1;
272         }
273
274         req = tevent_req_create(queue, &state, struct sock_queue_write_state);
275         if (req == NULL) {
276                 return -1;
277         }
278
279         state->pkt = buf;
280         state->pkt_size = (uint32_t)buflen;
281
282         status = tevent_queue_add_entry(queue->queue, queue->ev, req,
283                                         sock_queue_trigger, queue);
284         if (! status) {
285                 talloc_free(req);
286                 return -1;
287         }
288
289         return 0;
290 }
291
292 static void sock_queue_trigger(struct tevent_req *req, void *private_data)
293 {
294         struct sock_queue *queue = talloc_get_type_abort(
295                 private_data, struct sock_queue);
296         struct sock_queue_write_state *state = tevent_req_data(
297                 req, struct sock_queue_write_state);
298         size_t offset = 0;
299
300         do {
301                 ssize_t nwritten;
302
303                 nwritten = sys_write(queue->fd, state->pkt + offset,
304                                      state->pkt_size - offset);
305                 if (nwritten < 0) {
306                         queue->callback(NULL, 0, queue->private_data);
307                         return;
308                 }
309                 offset += nwritten;
310
311         } while (offset < state->pkt_size);
312
313         tevent_req_done(req);
314         talloc_free(req);
315 }