tsocket: add tstream_writev_queue_send/recv()
[samba.git] / lib / tsocket / tsocket_helpers.c
1 /*
2    Unix SMB/CIFS implementation.
3
4    Copyright (C) Stefan Metzmacher 2009
5
6      ** NOTE! The following LGPL license applies to the tevent
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 #include "replace.h"
25 #include "system/filesys.h"
26 #include "tsocket.h"
27 #include "tsocket_internal.h"
28
29 struct tdgram_sendto_queue_state {
30         /* this structs are owned by the caller */
31         struct {
32                 struct tevent_context *ev;
33                 struct tdgram_context *dgram;
34                 const uint8_t *buf;
35                 size_t len;
36                 const struct tsocket_address *dst;
37         } caller;
38         ssize_t ret;
39 };
40
41 static void tdgram_sendto_queue_trigger(struct tevent_req *req,
42                                          void *private_data);
43 static void tdgram_sendto_queue_done(struct tevent_req *subreq);
44
45 /**
46  * @brief Queue a dgram blob for sending through the socket
47  * @param[in] mem_ctx   The memory context for the result
48  * @param[in] ev        The event context the operation should work on
49  * @param[in] dgram     The tdgram_context to send the message buffer
50  * @param[in] queue     The existing dgram queue
51  * @param[in] buf       The message buffer
52  * @param[in] len       The message length
53  * @param[in] dst       The destination socket address
54  * @retval              The async request handle
55  *
56  * This function queues a blob for sending to destination through an existing
57  * dgram socket. The async callback is triggered when the whole blob is
58  * delivered to the underlying system socket.
59  *
60  * The caller needs to make sure that all non-scalar input parameters hang
61  * arround for the whole lifetime of the request.
62  */
63 struct tevent_req *tdgram_sendto_queue_send(TALLOC_CTX *mem_ctx,
64                                             struct tevent_context *ev,
65                                             struct tdgram_context *dgram,
66                                             struct tevent_queue *queue,
67                                             const uint8_t *buf,
68                                             size_t len,
69                                             struct tsocket_address *dst)
70 {
71         struct tevent_req *req;
72         struct tdgram_sendto_queue_state *state;
73         bool ok;
74
75         req = tevent_req_create(mem_ctx, &state,
76                                 struct tdgram_sendto_queue_state);
77         if (!req) {
78                 return NULL;
79         }
80
81         state->caller.ev        = ev;
82         state->caller.dgram     = dgram;
83         state->caller.buf       = buf;
84         state->caller.len       = len;
85         state->caller.dst       = dst;
86         state->ret              = -1;
87
88         ok = tevent_queue_add(queue,
89                               ev,
90                               req,
91                               tdgram_sendto_queue_trigger,
92                               NULL);
93         if (!ok) {
94                 tevent_req_nomem(NULL, req);
95                 goto post;
96         }
97
98         return req;
99
100  post:
101         tevent_req_post(req, ev);
102         return req;
103 }
104
105 static void tdgram_sendto_queue_trigger(struct tevent_req *req,
106                                          void *private_data)
107 {
108         struct tdgram_sendto_queue_state *state = tevent_req_data(req,
109                                         struct tdgram_sendto_queue_state);
110         struct tevent_req *subreq;
111
112         subreq = tdgram_sendto_send(state,
113                                     state->caller.ev,
114                                     state->caller.dgram,
115                                     state->caller.buf,
116                                     state->caller.len,
117                                     state->caller.dst);
118         if (tevent_req_nomem(subreq, req)) {
119                 return;
120         }
121         tevent_req_set_callback(subreq, tdgram_sendto_queue_done, req);
122 }
123
124 static void tdgram_sendto_queue_done(struct tevent_req *subreq)
125 {
126         struct tevent_req *req = tevent_req_callback_data(subreq,
127                                  struct tevent_req);
128         struct tdgram_sendto_queue_state *state = tevent_req_data(req,
129                                         struct tdgram_sendto_queue_state);
130         ssize_t ret;
131         int sys_errno;
132
133         ret = tdgram_sendto_recv(subreq, &sys_errno);
134         talloc_free(subreq);
135         if (ret == -1) {
136                 tevent_req_error(req, sys_errno);
137                 return;
138         }
139         state->ret = ret;
140
141         tevent_req_done(req);
142 }
143
144 ssize_t tdgram_sendto_queue_recv(struct tevent_req *req, int *perrno)
145 {
146         struct tdgram_sendto_queue_state *state = tevent_req_data(req,
147                                         struct tdgram_sendto_queue_state);
148         ssize_t ret;
149
150         ret = tsocket_simple_int_recv(req, perrno);
151         if (ret == 0) {
152                 ret = state->ret;
153         }
154
155         tevent_req_received(req);
156         return ret;
157 }
158
159 struct tstream_readv_pdu_state {
160         /* this structs are owned by the caller */
161         struct {
162                 struct tevent_context *ev;
163                 struct tstream_context *stream;
164                 tstream_readv_pdu_next_vector_t next_vector_fn;
165                 void *next_vector_private;
166         } caller;
167
168         /*
169          * Each call to the callback resets iov and count
170          * the callback allocated the iov as child of our state,
171          * that means we are allowed to modify and free it.
172          *
173          * we should call the callback every time we filled the given
174          * vector and ask for a new vector. We return if the callback
175          * ask for 0 bytes.
176          */
177         struct iovec *vector;
178         size_t count;
179
180         /*
181          * the total number of bytes we read,
182          * the return value of the _recv function
183          */
184         int total_read;
185 };
186
187 static void tstream_readv_pdu_ask_for_next_vector(struct tevent_req *req);
188 static void tstream_readv_pdu_readv_done(struct tevent_req *subreq);
189
190 struct tevent_req *tstream_readv_pdu_send(TALLOC_CTX *mem_ctx,
191                                 struct tevent_context *ev,
192                                 struct tstream_context *stream,
193                                 tstream_readv_pdu_next_vector_t next_vector_fn,
194                                 void *next_vector_private)
195 {
196         struct tevent_req *req;
197         struct tstream_readv_pdu_state *state;
198
199         req = tevent_req_create(mem_ctx, &state,
200                                 struct tstream_readv_pdu_state);
201         if (!req) {
202                 return NULL;
203         }
204
205         state->caller.ev                        = ev;
206         state->caller.stream                    = stream;
207         state->caller.next_vector_fn            = next_vector_fn;
208         state->caller.next_vector_private       = next_vector_private;
209
210         state->vector           = NULL;
211         state->count            = 0;
212         state->total_read       = 0;
213
214         tstream_readv_pdu_ask_for_next_vector(req);
215         if (!tevent_req_is_in_progress(req)) {
216                 goto post;
217         }
218
219         return req;
220
221  post:
222         return tevent_req_post(req, ev);
223 }
224
225 static void tstream_readv_pdu_ask_for_next_vector(struct tevent_req *req)
226 {
227         struct tstream_readv_pdu_state *state = tevent_req_data(req,
228                                             struct tstream_readv_pdu_state);
229         int ret;
230         size_t to_read = 0;
231         size_t i;
232         struct tevent_req *subreq;
233
234         TALLOC_FREE(state->vector);
235         state->count = 0;
236
237         ret = state->caller.next_vector_fn(state->caller.stream,
238                                            state->caller.next_vector_private,
239                                            state, &state->vector, &state->count);
240         if (ret == -1) {
241                 tevent_req_error(req, errno);
242                 return;
243         }
244
245         if (state->count == 0) {
246                 tevent_req_done(req);
247                 return;
248         }
249
250         for (i=0; i < state->count; i++) {
251                 size_t tmp = to_read;
252                 tmp += state->vector[i].iov_len;
253
254                 if (tmp < to_read) {
255                         tevent_req_error(req, EMSGSIZE);
256                         return;
257                 }
258
259                 to_read = tmp;
260         }
261
262         /*
263          * this is invalid the next vector function should have
264          * reported count == 0.
265          */
266         if (to_read == 0) {
267                 tevent_req_error(req, EINVAL);
268                 return;
269         }
270
271         if (state->total_read + to_read < state->total_read) {
272                 tevent_req_error(req, EMSGSIZE);
273                 return;
274         }
275
276         subreq = tstream_readv_send(state,
277                                     state->caller.ev,
278                                     state->caller.stream,
279                                     state->vector,
280                                     state->count);
281         if (tevent_req_nomem(subreq, req)) {
282                 return;
283         }
284         tevent_req_set_callback(subreq, tstream_readv_pdu_readv_done, req);
285 }
286
287 static void tstream_readv_pdu_readv_done(struct tevent_req *subreq)
288 {
289         struct tevent_req *req = tevent_req_callback_data(subreq,
290                                  struct tevent_req);
291         struct tstream_readv_pdu_state *state = tevent_req_data(req,
292                                             struct tstream_readv_pdu_state);
293         int ret;
294         int sys_errno;
295
296         ret = tstream_readv_recv(subreq, &sys_errno);
297         if (ret == -1) {
298                 tevent_req_error(req, sys_errno);
299                 return;
300         }
301
302         state->total_read += ret;
303
304         /* ask the callback for a new vector we should fill */
305         tstream_readv_pdu_ask_for_next_vector(req);
306 }
307
308 int tstream_readv_pdu_recv(struct tevent_req *req, int *perrno)
309 {
310         struct tstream_readv_pdu_state *state = tevent_req_data(req,
311                                             struct tstream_readv_pdu_state);
312         int ret;
313
314         ret = tsocket_simple_int_recv(req, perrno);
315         if (ret == 0) {
316                 ret = state->total_read;
317         }
318
319         tevent_req_received(req);
320         return ret;
321 }
322
323 struct tstream_writev_queue_state {
324         /* this structs are owned by the caller */
325         struct {
326                 struct tevent_context *ev;
327                 struct tstream_context *stream;
328                 const struct iovec *vector;
329                 size_t count;
330         } caller;
331         int ret;
332 };
333
334 static void tstream_writev_queue_trigger(struct tevent_req *req,
335                                          void *private_data);
336 static void tstream_writev_queue_done(struct tevent_req *subreq);
337
338 /**
339  * @brief Queue a dgram blob for sending through the socket
340  * @param[in] mem_ctx   The memory context for the result
341  * @param[in] ev        The tevent_context to run on
342  * @param[in] stream    The stream to send data through
343  * @param[in] queue     The existing send queue
344  * @param[in] vector    The iovec vector so write
345  * @param[in] count     The size of the vector
346  * @retval              The async request handle
347  *
348  * This function queues a blob for sending to destination through an existing
349  * dgram socket. The async callback is triggered when the whole blob is
350  * delivered to the underlying system socket.
351  *
352  * The caller needs to make sure that all non-scalar input parameters hang
353  * arround for the whole lifetime of the request.
354  */
355 struct tevent_req *tstream_writev_queue_send(TALLOC_CTX *mem_ctx,
356                                              struct tevent_context *ev,
357                                              struct tstream_context *stream,
358                                              struct tevent_queue *queue,
359                                              const struct iovec *vector,
360                                              size_t count)
361 {
362         struct tevent_req *req;
363         struct tstream_writev_queue_state *state;
364         bool ok;
365
366         req = tevent_req_create(mem_ctx, &state,
367                                 struct tstream_writev_queue_state);
368         if (!req) {
369                 return NULL;
370         }
371
372         state->caller.ev        = ev;
373         state->caller.stream    = stream;
374         state->caller.vector    = vector;
375         state->caller.count     = count;
376         state->ret              = -1;
377
378         ok = tevent_queue_add(queue,
379                               ev,
380                               req,
381                               tstream_writev_queue_trigger,
382                               NULL);
383         if (!ok) {
384                 tevent_req_nomem(NULL, req);
385                 goto post;
386         }
387
388         return req;
389
390  post:
391         return tevent_req_post(req, ev);
392 }
393
394 static void tstream_writev_queue_trigger(struct tevent_req *req,
395                                          void *private_data)
396 {
397         struct tstream_writev_queue_state *state = tevent_req_data(req,
398                                         struct tstream_writev_queue_state);
399         struct tevent_req *subreq;
400
401         subreq = tstream_writev_send(state,
402                                      state->caller.ev,
403                                      state->caller.stream,
404                                      state->caller.vector,
405                                      state->caller.count);
406         if (tevent_req_nomem(subreq, req)) {
407                 return;
408         }
409         tevent_req_set_callback(subreq, tstream_writev_queue_done ,req);
410 }
411
412 static void tstream_writev_queue_done(struct tevent_req *subreq)
413 {
414         struct tevent_req *req = tevent_req_callback_data(subreq,
415                                  struct tevent_req);
416         struct tstream_writev_queue_state *state = tevent_req_data(req,
417                                         struct tstream_writev_queue_state);
418         int ret;
419         int sys_errno;
420
421         ret = tstream_writev_recv(subreq, &sys_errno);
422         talloc_free(subreq);
423         if (ret == -1) {
424                 tevent_req_error(req, sys_errno);
425                 return;
426         }
427         state->ret = ret;
428
429         tevent_req_done(req);
430 }
431
432 int tstream_writev_queue_recv(struct tevent_req *req, int *perrno)
433 {
434         struct tstream_writev_queue_state *state = tevent_req_data(req,
435                                         struct tstream_writev_queue_state);
436         int ret;
437
438         ret = tsocket_simple_int_recv(req, perrno);
439         if (ret == 0) {
440                 ret = state->ret;
441         }
442
443         tevent_req_received(req);
444         return ret;
445 }
446