s3:rpc_client/rpc_transport_tstream: timeout should be unsigned int
[kai/samba.git] / source3 / rpc_client / rpc_transport_tstream.c
1 /*
2  *  Unix SMB/CIFS implementation.
3  *  RPC client transport over tstream
4  *  Copyright (C) Simo Sorce 2010
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; either version 3 of the License, or
9  *  (at your option) any later version.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program; if not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "includes.h"
21 #include "lib/tsocket/tsocket.h"
22
23 #undef DBGC_CLASS
24 #define DBGC_CLASS DBGC_RPC_CLI
25
26 struct rpc_tstream_state {
27         struct tstream_context *stream;
28         struct tevent_queue *read_queue;
29         struct tevent_queue *write_queue;
30         unsigned int timeout;
31 };
32
33 static void rpc_tstream_disconnect(struct rpc_tstream_state *s)
34 {
35         TALLOC_FREE(s->stream);
36 }
37
38 static bool rpc_tstream_is_connected(void *priv)
39 {
40         struct rpc_tstream_state *transp =
41                 talloc_get_type_abort(priv, struct rpc_tstream_state);
42
43         if (!transp->stream) {
44                 return false;
45         }
46
47         return true;
48 }
49
50 static unsigned int rpc_tstream_set_timeout(void *priv, unsigned int timeout)
51 {
52         struct rpc_tstream_state *transp =
53                 talloc_get_type_abort(priv, struct rpc_tstream_state);
54         int orig_timeout;
55         bool ok;
56
57         ok = rpc_tstream_is_connected(transp);
58         if (!ok) {
59                 return 0;
60         }
61
62         orig_timeout = transp->timeout;
63
64         transp->timeout = timeout;
65
66         return orig_timeout;
67 }
68
69 struct rpc_tstream_next_vector_state {
70         uint8_t *buf;
71         size_t len;
72         off_t ofs;
73         size_t remaining;
74 };
75
76 static void rpc_tstream_next_vector_init(
77                                 struct rpc_tstream_next_vector_state *s,
78                                 uint8_t *buf, size_t len)
79 {
80         ZERO_STRUCTP(s);
81
82         s->buf = buf;
83         s->len = MIN(len, UINT16_MAX);
84 }
85
86 static int rpc_tstream_next_vector(struct tstream_context *stream,
87                                    void *private_data,
88                                    TALLOC_CTX *mem_ctx,
89                                    struct iovec **_vector,
90                                    size_t *count)
91 {
92         struct rpc_tstream_next_vector_state *state =
93                 (struct rpc_tstream_next_vector_state *)private_data;
94         struct iovec *vector;
95         ssize_t pending;
96         size_t wanted;
97
98         if (state->ofs == state->len) {
99                 *_vector = NULL;
100                 *count = 0;
101                 return 0;
102         }
103
104         pending = tstream_pending_bytes(stream);
105         if (pending == -1) {
106                 return -1;
107         }
108
109         if (pending == 0 && state->ofs != 0) {
110                 /* return a short read */
111                 *_vector = NULL;
112                 *count = 0;
113                 return 0;
114         }
115
116         if (pending == 0) {
117                 /* we want at least one byte and recheck again */
118                 wanted = 1;
119         } else {
120                 size_t missing = state->len - state->ofs;
121                 if (pending > missing) {
122                         /* there's more available */
123                         state->remaining = pending - missing;
124                         wanted = missing;
125                 } else {
126                         /* read what we can get and recheck in the next cycle */
127                         wanted = pending;
128                 }
129         }
130
131         vector = talloc_array(mem_ctx, struct iovec, 1);
132         if (!vector) {
133                 return -1;
134         }
135
136         vector[0].iov_base = state->buf + state->ofs;
137         vector[0].iov_len = wanted;
138
139         state->ofs += wanted;
140
141         *_vector = vector;
142         *count = 1;
143         return 0;
144 }
145
146 struct rpc_tstream_read_state {
147         struct rpc_tstream_state *transp;
148         struct rpc_tstream_next_vector_state next_vector;
149         ssize_t nread;
150 };
151
152 static void rpc_tstream_read_done(struct tevent_req *subreq);
153
154 static struct tevent_req *rpc_tstream_read_send(TALLOC_CTX *mem_ctx,
155                                              struct event_context *ev,
156                                              uint8_t *data, size_t size,
157                                              void *priv)
158 {
159         struct rpc_tstream_state *transp =
160                 talloc_get_type_abort(priv, struct rpc_tstream_state);
161         struct tevent_req *req, *subreq;
162         struct rpc_tstream_read_state *state;
163         struct timeval endtime;
164
165         req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_read_state);
166         if (req == NULL) {
167                 return NULL;
168         }
169         if (!rpc_tstream_is_connected(transp)) {
170                 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
171                 return tevent_req_post(req, ev);
172         }
173         state->transp = transp;
174         rpc_tstream_next_vector_init(&state->next_vector, data, size);
175
176         subreq = tstream_readv_pdu_queue_send(state, ev,
177                                               transp->stream,
178                                               transp->read_queue,
179                                               rpc_tstream_next_vector,
180                                               &state->next_vector);
181         if (subreq == NULL) {
182                 tevent_req_nterror(req, NT_STATUS_NO_MEMORY);
183                 return tevent_req_post(req, ev);
184         }
185
186         endtime = timeval_current_ofs(0, transp->timeout * 1000);
187         if (!tevent_req_set_endtime(subreq, ev, endtime)) {
188                 goto fail;
189         }
190
191         tevent_req_set_callback(subreq, rpc_tstream_read_done, req);
192         return req;
193  fail:
194         TALLOC_FREE(req);
195         return NULL;
196 }
197
198 static void rpc_tstream_read_done(struct tevent_req *subreq)
199 {
200         struct tevent_req *req =
201                 tevent_req_callback_data(subreq, struct tevent_req);
202         struct rpc_tstream_read_state *state =
203                 tevent_req_data(req, struct rpc_tstream_read_state);
204         int err;
205
206         state->nread = tstream_readv_pdu_queue_recv(subreq, &err);
207         TALLOC_FREE(subreq);
208         if (state->nread < 0) {
209                 rpc_tstream_disconnect(state->transp);
210                 tevent_req_nterror(req, map_nt_error_from_unix(err));
211                 return;
212         }
213         tevent_req_done(req);
214 }
215
216 static NTSTATUS rpc_tstream_read_recv(struct tevent_req *req, ssize_t *size)
217 {
218         struct rpc_tstream_read_state *state = tevent_req_data(
219                 req, struct rpc_tstream_read_state);
220         NTSTATUS status;
221
222         if (tevent_req_is_nterror(req, &status)) {
223                 return status;
224         }
225         *size = state->nread;
226         return NT_STATUS_OK;
227 }
228
229 struct rpc_tstream_write_state {
230         struct event_context *ev;
231         struct rpc_tstream_state *transp;
232         struct iovec iov;
233         ssize_t nwritten;
234 };
235
236 static void rpc_tstream_write_done(struct tevent_req *subreq);
237
238 static struct tevent_req *rpc_tstream_write_send(TALLOC_CTX *mem_ctx,
239                                               struct event_context *ev,
240                                               const uint8_t *data, size_t size,
241                                               void *priv)
242 {
243         struct rpc_tstream_state *transp =
244                 talloc_get_type_abort(priv, struct rpc_tstream_state);
245         struct tevent_req *req, *subreq;
246         struct rpc_tstream_write_state *state;
247         struct timeval endtime;
248
249         req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_write_state);
250         if (req == NULL) {
251                 return NULL;
252         }
253         if (!rpc_tstream_is_connected(transp)) {
254                 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
255                 return tevent_req_post(req, ev);
256         }
257         state->ev = ev;
258         state->transp = transp;
259         state->iov.iov_base = discard_const_p(void *, data);
260         state->iov.iov_len = size;
261
262         subreq = tstream_writev_queue_send(state, ev,
263                                            transp->stream,
264                                            transp->write_queue,
265                                            &state->iov, 1);
266         if (subreq == NULL) {
267                 goto fail;
268         }
269
270         endtime = timeval_current_ofs(0, transp->timeout * 1000);
271         if (!tevent_req_set_endtime(subreq, ev, endtime)) {
272                 goto fail;
273         }
274
275         tevent_req_set_callback(subreq, rpc_tstream_write_done, req);
276         return req;
277  fail:
278         TALLOC_FREE(req);
279         return NULL;
280 }
281
282 static void rpc_tstream_write_done(struct tevent_req *subreq)
283 {
284         struct tevent_req *req =
285                 tevent_req_callback_data(subreq, struct tevent_req);
286         struct rpc_tstream_write_state *state =
287                 tevent_req_data(req, struct rpc_tstream_write_state);
288         int err;
289
290         state->nwritten = tstream_writev_queue_recv(subreq, &err);
291         TALLOC_FREE(subreq);
292         if (state->nwritten < 0) {
293                 rpc_tstream_disconnect(state->transp);
294                 tevent_req_nterror(req, map_nt_error_from_unix(err));
295                 return;
296         }
297         tevent_req_done(req);
298 }
299
300 static NTSTATUS rpc_tstream_write_recv(struct tevent_req *req, ssize_t *sent)
301 {
302         struct rpc_tstream_write_state *state =
303                 tevent_req_data(req, struct rpc_tstream_write_state);
304         NTSTATUS status;
305
306         if (tevent_req_is_nterror(req, &status)) {
307                 return status;
308         }
309         *sent = state->nwritten;
310         return NT_STATUS_OK;
311 }
312
313 /**
314 * @brief Initialize a tstream transport facility
315 *        NOTE: this function will talloc_steal, the stream and the queues.
316 *
317 * @param mem_ctx        - memory context used to allocate the transport
318 * @param stream         - a ready to use tstream
319 * @param presult        - the transport structure
320 *
321 * @return               - a NT Status error code.
322 */
323 NTSTATUS rpc_transport_tstream_init(TALLOC_CTX *mem_ctx,
324                                 struct tstream_context **stream,
325                                 struct rpc_cli_transport **presult)
326 {
327         struct rpc_cli_transport *result;
328         struct rpc_tstream_state *state;
329
330         result = talloc(mem_ctx, struct rpc_cli_transport);
331         if (result == NULL) {
332                 return NT_STATUS_NO_MEMORY;
333         }
334         state = talloc(result, struct rpc_tstream_state);
335         if (state == NULL) {
336                 TALLOC_FREE(result);
337                 return NT_STATUS_NO_MEMORY;
338         }
339         result->priv = state;
340
341         state->read_queue = tevent_queue_create(state, "read_queue");
342         if (state->read_queue == NULL) {
343                 TALLOC_FREE(result);
344                 return NT_STATUS_NO_MEMORY;
345         }
346         state->write_queue = tevent_queue_create(state, "write_queue");
347         if (state->write_queue == NULL) {
348                 TALLOC_FREE(result);
349                 return NT_STATUS_NO_MEMORY;
350         }
351
352         state->stream = talloc_move(state, stream);
353         state->timeout = 10000; /* 10 seconds. */
354
355         result->trans_send = NULL;
356         result->trans_recv = NULL;
357         result->write_send = rpc_tstream_write_send;
358         result->write_recv = rpc_tstream_write_recv;
359         result->read_send = rpc_tstream_read_send;
360         result->read_recv = rpc_tstream_read_recv;
361         result->is_connected = rpc_tstream_is_connected;
362         result->set_timeout = rpc_tstream_set_timeout;
363
364         *presult = result;
365         return NT_STATUS_OK;
366 }