lib/tsocket: add tsocket_writev_queue_send/recv()
[ira/wip.git] / lib / tsocket / tsocket_writev.c
1 /*
2    Unix SMB/CIFS implementation.
3
4    Copyright (C) Stefan Metzmacher 2009
5
6      ** NOTE! The following LGPL license applies to the tevent
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 #include "replace.h"
25 #include "system/network.h"
26 #include "tsocket.h"
27 #include "tsocket_internal.h"
28
29 struct tsocket_writev_state {
30         /* this structs are owned by the caller */
31         struct {
32                 struct tsocket_context *sock;
33                 const struct iovec *vector;
34                 size_t count;
35         } caller;
36
37         struct iovec *iov;
38         size_t count;
39         int total_written;
40 };
41
42 static int tsocket_writev_state_destructor(struct tsocket_writev_state *state)
43 {
44         if (state->caller.sock) {
45                 tsocket_set_writeable_handler(state->caller.sock, NULL, NULL);
46         }
47         ZERO_STRUCT(state->caller);
48
49         return 0;
50 }
51
52 static void tsocket_writev_handler(struct tsocket_context *sock,
53                                    void *private_data);
54
55 struct tevent_req *tsocket_writev_send(struct tsocket_context *sock,
56                                        TALLOC_CTX *mem_ctx,
57                                        const struct iovec *vector,
58                                        size_t count)
59 {
60         struct tevent_req *req;
61         struct tsocket_writev_state *state;
62         int ret;
63         int err;
64         bool dummy;
65         int to_write = 0;
66         size_t i;
67
68         req = tevent_req_create(mem_ctx, &state,
69                                 struct tsocket_writev_state);
70         if (!req) {
71                 return NULL;
72         }
73
74         state->caller.sock      = sock;
75         state->caller.vector    = vector;
76         state->caller.count     = count;
77
78         state->iov              = NULL;
79         state->count            = count;
80         state->total_written    = 0;
81
82         state->iov = talloc_array(state, struct iovec, count);
83         if (tevent_req_nomem(state->iov, req)) {
84                 goto post;
85         }
86         memcpy(state->iov, vector, sizeof(struct iovec) * count);
87
88         for (i=0; i < count; i++) {
89                 int tmp = to_write;
90
91                 tmp += state->iov[i].iov_len;
92
93                 if (tmp < to_write) {
94                         tevent_req_error(req, EMSGSIZE);
95                         goto post;
96                 }
97
98                 to_write = tmp;
99         }
100
101         if (to_write == 0) {
102                 tevent_req_done(req);
103                 goto post;
104         }
105
106         talloc_set_destructor(state, tsocket_writev_state_destructor);
107
108         ret = tsocket_set_writeable_handler(sock,
109                                             tsocket_writev_handler,
110                                             req);
111         err = tsocket_error_from_errno(ret, errno, &dummy);
112         if (tevent_req_error(req, err)) {
113                 goto post;
114         }
115
116         return req;
117
118  post:
119         return tevent_req_post(req, sock->event.ctx);
120 }
121
122 static void tsocket_writev_handler(struct tsocket_context *sock,
123                                    void *private_data)
124 {
125         struct tevent_req *req = talloc_get_type(private_data,
126                                  struct tevent_req);
127         struct tsocket_writev_state *state = tevent_req_data(req,
128                                              struct tsocket_writev_state);
129         int ret;
130         int err;
131         bool retry;
132
133         ret = tsocket_writev(state->caller.sock,
134                              state->iov,
135                              state->count);
136         err = tsocket_error_from_errno(ret, errno, &retry);
137         if (retry) {
138                 /* retry later */
139                 return;
140         }
141         if (tevent_req_error(req, err)) {
142                 return;
143         }
144
145         state->total_written += ret;
146
147         /*
148          * we have not written everything yet, so we need to truncate
149          * the already written bytes from our iov copy
150          */
151         while (ret > 0) {
152                 if (ret < state->iov[0].iov_len) {
153                         uint8_t *base;
154                         base = (uint8_t *)state->iov[0].iov_base;
155                         base += ret;
156                         state->iov[0].iov_base = base;
157                         state->iov[0].iov_len -= ret;
158                         break;
159                 }
160                 ret -= state->iov[0].iov_len;
161                 state->iov += 1;
162                 state->count -= 1;
163         }
164
165         if (state->count > 0) {
166                 /* more to write */
167                 return;
168         }
169
170         tevent_req_done(req);
171 }
172
173 int tsocket_writev_recv(struct tevent_req *req, int *perrno)
174 {
175         struct tsocket_writev_state *state = tevent_req_data(req,
176                                              struct tsocket_writev_state);
177         int ret;
178
179         ret = tsocket_simple_int_recv(req, perrno);
180         if (ret == 0) {
181                 ret = state->total_written;
182         }
183
184         tevent_req_received(req);
185         return ret;
186 }
187
188 struct tsocket_writev_queue_state {
189         /* this structs are owned by the caller */
190         struct {
191                 struct tsocket_context *sock;
192                 const struct iovec *vector;
193                 size_t count;
194         } caller;
195         int ret;
196 };
197
198 static void tsocket_writev_queue_trigger(struct tevent_req *req,
199                                          void *private_data);
200 static void tsocket_writev_queue_done(struct tevent_req *subreq);
201
202 /**
203  * @brief Queue a dgram blob for sending through the socket
204  * @param[in] mem_ctx   The memory context for the result
205  * @param[in] sock      The socket to send data through
206  * @param[in] queue     The existing send queue
207  * @param[in] vector    The iovec vector so write
208  * @param[in] count     The size of the vector
209  * @retval              The async request handle
210  *
211  * This function queues a blob for sending to destination through an existing
212  * dgram socket. The async callback is triggered when the whole blob is
213  * delivered to the underlying system socket.
214  *
215  * The caller needs to make sure that all non-scalar input parameters hang
216  * arround for the whole lifetime of the request.
217  */
218 struct tevent_req *tsocket_writev_queue_send(TALLOC_CTX *mem_ctx,
219                                              struct tsocket_context *sock,
220                                              struct tevent_queue *queue,
221                                              const struct iovec *vector,
222                                              size_t count)
223 {
224         struct tevent_req *req;
225         struct tsocket_writev_queue_state *state;
226         bool ok;
227
228         req = tevent_req_create(mem_ctx, &state,
229                                 struct tsocket_writev_queue_state);
230         if (!req) {
231                 return NULL;
232         }
233
234         state->caller.sock      = sock;
235         state->caller.vector    = vector;
236         state->caller.count     = count;
237         state->ret              = -1;
238
239         ok = tevent_queue_add(queue,
240                               sock->event.ctx,
241                               req,
242                               tsocket_writev_queue_trigger,
243                               NULL);
244         if (!ok) {
245                 tevent_req_nomem(NULL, req);
246                 goto post;
247         }
248
249         return req;
250
251  post:
252         return tevent_req_post(req, sock->event.ctx);
253 }
254
255 static void tsocket_writev_queue_trigger(struct tevent_req *req,
256                                          void *private_data)
257 {
258         struct tsocket_writev_queue_state *state = tevent_req_data(req,
259                                         struct tsocket_writev_queue_state);
260         struct tevent_req *subreq;
261
262         subreq = tsocket_writev_send(state->caller.sock,
263                                      state,
264                                      state->caller.vector,
265                                      state->caller.count);
266         if (tevent_req_nomem(subreq, req)) {
267                 return;
268         }
269         tevent_req_set_callback(subreq, tsocket_writev_queue_done ,req);
270 }
271
272 static void tsocket_writev_queue_done(struct tevent_req *subreq)
273 {
274         struct tevent_req *req = tevent_req_callback_data(subreq,
275                                  struct tevent_req);
276         struct tsocket_writev_queue_state *state = tevent_req_data(req,
277                                         struct tsocket_writev_queue_state);
278         int ret;
279         int sys_errno;
280
281         ret = tsocket_writev_recv(subreq, &sys_errno);
282         talloc_free(subreq);
283         if (ret == -1) {
284                 tevent_req_error(req, sys_errno);
285                 return;
286         }
287         state->ret = ret;
288
289         tevent_req_done(req);
290 }
291
292 int tsocket_writev_queue_recv(struct tevent_req *req, int *perrno)
293 {
294         struct tsocket_writev_queue_state *state = tevent_req_data(req,
295                                         struct tsocket_writev_queue_state);
296         int ret;
297
298         ret = tsocket_simple_int_recv(req, perrno);
299         if (ret == 0) {
300                 ret = state->ret;
301         }
302
303         tevent_req_received(req);
304         return ret;
305 }
306