s3:rpc_client/rpc_transport_tstream.c: add some logic to handle cli_np tstreams
[bbaumbach/samba-autobuild/.git] / source3 / rpc_client / rpc_transport_tstream.c
1 /*
2  *  Unix SMB/CIFS implementation.
3  *  RPC client transport over tstream
4  *  Copyright (C) Simo Sorce 2010
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; either version 3 of the License, or
9  *  (at your option) any later version.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program; if not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "includes.h"
21 #include "lib/tsocket/tsocket.h"
22 #include "libsmb/cli_np_tstream.h"
23
24 #undef DBGC_CLASS
25 #define DBGC_CLASS DBGC_RPC_CLI
26
27 struct rpc_tstream_state {
28         struct tstream_context *stream;
29         struct tevent_queue *read_queue;
30         struct tevent_queue *write_queue;
31         unsigned int timeout;
32 };
33
34 static void rpc_tstream_disconnect(struct rpc_tstream_state *s)
35 {
36         TALLOC_FREE(s->stream);
37 }
38
39 static bool rpc_tstream_is_connected(void *priv)
40 {
41         struct rpc_tstream_state *transp =
42                 talloc_get_type_abort(priv, struct rpc_tstream_state);
43         ssize_t ret;
44
45         if (!transp->stream) {
46                 return false;
47         }
48
49         if (!tstream_is_cli_np(transp->stream)) {
50                 return true;
51         }
52
53         ret = tstream_pending_bytes(transp->stream);
54         if (ret == -1) {
55                 return false;
56         }
57
58         return true;
59 }
60
61 static unsigned int rpc_tstream_set_timeout(void *priv, unsigned int timeout)
62 {
63         struct rpc_tstream_state *transp =
64                 talloc_get_type_abort(priv, struct rpc_tstream_state);
65         int orig_timeout;
66         bool ok;
67
68         ok = rpc_tstream_is_connected(transp);
69         if (!ok) {
70                 return 0;
71         }
72
73         if (tstream_is_cli_np(transp->stream)) {
74                 transp->timeout = timeout;
75                 return tstream_cli_np_set_timeout(transp->stream, timeout);
76         }
77
78         orig_timeout = transp->timeout;
79
80         transp->timeout = timeout;
81
82         return orig_timeout;
83 }
84
85 struct rpc_tstream_next_vector_state {
86         uint8_t *buf;
87         size_t len;
88         off_t ofs;
89         size_t remaining;
90 };
91
92 static void rpc_tstream_next_vector_init(
93                                 struct rpc_tstream_next_vector_state *s,
94                                 uint8_t *buf, size_t len)
95 {
96         ZERO_STRUCTP(s);
97
98         s->buf = buf;
99         s->len = MIN(len, UINT16_MAX);
100 }
101
102 static int rpc_tstream_next_vector(struct tstream_context *stream,
103                                    void *private_data,
104                                    TALLOC_CTX *mem_ctx,
105                                    struct iovec **_vector,
106                                    size_t *count)
107 {
108         struct rpc_tstream_next_vector_state *state =
109                 (struct rpc_tstream_next_vector_state *)private_data;
110         struct iovec *vector;
111         ssize_t pending;
112         size_t wanted;
113
114         if (state->ofs == state->len) {
115                 *_vector = NULL;
116                 *count = 0;
117                 return 0;
118         }
119
120         pending = tstream_pending_bytes(stream);
121         if (pending == -1) {
122                 return -1;
123         }
124
125         if (pending == 0 && state->ofs != 0) {
126                 /* return a short read */
127                 *_vector = NULL;
128                 *count = 0;
129                 return 0;
130         }
131
132         if (pending == 0) {
133                 /* we want at least one byte and recheck again */
134                 wanted = 1;
135         } else {
136                 size_t missing = state->len - state->ofs;
137                 if (pending > missing) {
138                         /* there's more available */
139                         state->remaining = pending - missing;
140                         wanted = missing;
141                 } else {
142                         /* read what we can get and recheck in the next cycle */
143                         wanted = pending;
144                 }
145         }
146
147         vector = talloc_array(mem_ctx, struct iovec, 1);
148         if (!vector) {
149                 return -1;
150         }
151
152         vector[0].iov_base = state->buf + state->ofs;
153         vector[0].iov_len = wanted;
154
155         state->ofs += wanted;
156
157         *_vector = vector;
158         *count = 1;
159         return 0;
160 }
161
162 struct rpc_tstream_read_state {
163         struct rpc_tstream_state *transp;
164         struct rpc_tstream_next_vector_state next_vector;
165         ssize_t nread;
166 };
167
168 static void rpc_tstream_read_done(struct tevent_req *subreq);
169
170 static struct tevent_req *rpc_tstream_read_send(TALLOC_CTX *mem_ctx,
171                                              struct event_context *ev,
172                                              uint8_t *data, size_t size,
173                                              void *priv)
174 {
175         struct rpc_tstream_state *transp =
176                 talloc_get_type_abort(priv, struct rpc_tstream_state);
177         struct tevent_req *req, *subreq;
178         struct rpc_tstream_read_state *state;
179         struct timeval endtime;
180
181         req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_read_state);
182         if (req == NULL) {
183                 return NULL;
184         }
185         if (!rpc_tstream_is_connected(transp)) {
186                 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
187                 return tevent_req_post(req, ev);
188         }
189         state->transp = transp;
190         rpc_tstream_next_vector_init(&state->next_vector, data, size);
191
192         subreq = tstream_readv_pdu_queue_send(state, ev,
193                                               transp->stream,
194                                               transp->read_queue,
195                                               rpc_tstream_next_vector,
196                                               &state->next_vector);
197         if (subreq == NULL) {
198                 tevent_req_nterror(req, NT_STATUS_NO_MEMORY);
199                 return tevent_req_post(req, ev);
200         }
201
202         endtime = timeval_current_ofs(0, transp->timeout * 1000);
203         if (!tevent_req_set_endtime(subreq, ev, endtime)) {
204                 goto fail;
205         }
206
207         tevent_req_set_callback(subreq, rpc_tstream_read_done, req);
208         return req;
209  fail:
210         TALLOC_FREE(req);
211         return NULL;
212 }
213
214 static void rpc_tstream_read_done(struct tevent_req *subreq)
215 {
216         struct tevent_req *req =
217                 tevent_req_callback_data(subreq, struct tevent_req);
218         struct rpc_tstream_read_state *state =
219                 tevent_req_data(req, struct rpc_tstream_read_state);
220         int err;
221
222         state->nread = tstream_readv_pdu_queue_recv(subreq, &err);
223         TALLOC_FREE(subreq);
224         if (state->nread < 0) {
225                 rpc_tstream_disconnect(state->transp);
226                 tevent_req_nterror(req, map_nt_error_from_unix(err));
227                 return;
228         }
229         tevent_req_done(req);
230 }
231
232 static NTSTATUS rpc_tstream_read_recv(struct tevent_req *req, ssize_t *size)
233 {
234         struct rpc_tstream_read_state *state = tevent_req_data(
235                 req, struct rpc_tstream_read_state);
236         NTSTATUS status;
237
238         if (tevent_req_is_nterror(req, &status)) {
239                 return status;
240         }
241         *size = state->nread;
242         return NT_STATUS_OK;
243 }
244
245 struct rpc_tstream_write_state {
246         struct event_context *ev;
247         struct rpc_tstream_state *transp;
248         struct iovec iov;
249         ssize_t nwritten;
250 };
251
252 static void rpc_tstream_write_done(struct tevent_req *subreq);
253
254 static struct tevent_req *rpc_tstream_write_send(TALLOC_CTX *mem_ctx,
255                                               struct event_context *ev,
256                                               const uint8_t *data, size_t size,
257                                               void *priv)
258 {
259         struct rpc_tstream_state *transp =
260                 talloc_get_type_abort(priv, struct rpc_tstream_state);
261         struct tevent_req *req, *subreq;
262         struct rpc_tstream_write_state *state;
263         struct timeval endtime;
264
265         req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_write_state);
266         if (req == NULL) {
267                 return NULL;
268         }
269         if (!rpc_tstream_is_connected(transp)) {
270                 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
271                 return tevent_req_post(req, ev);
272         }
273         state->ev = ev;
274         state->transp = transp;
275         state->iov.iov_base = discard_const_p(void *, data);
276         state->iov.iov_len = size;
277
278         subreq = tstream_writev_queue_send(state, ev,
279                                            transp->stream,
280                                            transp->write_queue,
281                                            &state->iov, 1);
282         if (subreq == NULL) {
283                 goto fail;
284         }
285
286         endtime = timeval_current_ofs(0, transp->timeout * 1000);
287         if (!tevent_req_set_endtime(subreq, ev, endtime)) {
288                 goto fail;
289         }
290
291         tevent_req_set_callback(subreq, rpc_tstream_write_done, req);
292         return req;
293  fail:
294         TALLOC_FREE(req);
295         return NULL;
296 }
297
298 static void rpc_tstream_write_done(struct tevent_req *subreq)
299 {
300         struct tevent_req *req =
301                 tevent_req_callback_data(subreq, struct tevent_req);
302         struct rpc_tstream_write_state *state =
303                 tevent_req_data(req, struct rpc_tstream_write_state);
304         int err;
305
306         state->nwritten = tstream_writev_queue_recv(subreq, &err);
307         TALLOC_FREE(subreq);
308         if (state->nwritten < 0) {
309                 rpc_tstream_disconnect(state->transp);
310                 tevent_req_nterror(req, map_nt_error_from_unix(err));
311                 return;
312         }
313         tevent_req_done(req);
314 }
315
316 static NTSTATUS rpc_tstream_write_recv(struct tevent_req *req, ssize_t *sent)
317 {
318         struct rpc_tstream_write_state *state =
319                 tevent_req_data(req, struct rpc_tstream_write_state);
320         NTSTATUS status;
321
322         if (tevent_req_is_nterror(req, &status)) {
323                 return status;
324         }
325         *sent = state->nwritten;
326         return NT_STATUS_OK;
327 }
328
329 struct rpc_tstream_trans_state {
330         struct tevent_context *ev;
331         struct rpc_tstream_state *transp;
332         struct iovec req;
333         uint32_t max_rdata_len;
334         struct iovec rep;
335 };
336
337 static void rpc_tstream_trans_writev(struct tevent_req *subreq);
338 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq);
339
340 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
341                                          void *private_data,
342                                          TALLOC_CTX *mem_ctx,
343                                          struct iovec **_vector,
344                                          size_t *count);
345
346 static struct tevent_req *rpc_tstream_trans_send(TALLOC_CTX *mem_ctx,
347                                                  struct tevent_context *ev,
348                                                  uint8_t *data, size_t data_len,
349                                                  uint32_t max_rdata_len,
350                                                  void *priv)
351 {
352         struct rpc_tstream_state *transp =
353                 talloc_get_type_abort(priv, struct rpc_tstream_state);
354         struct tevent_req *req, *subreq;
355         struct rpc_tstream_trans_state *state;
356         struct timeval endtime;
357
358         req = tevent_req_create(mem_ctx, &state,
359                                 struct rpc_tstream_trans_state);
360         if (req == NULL) {
361                 return NULL;
362         }
363
364         if (!rpc_tstream_is_connected(transp)) {
365                 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
366                 return tevent_req_post(req, ev);
367         }
368         state->ev = ev;
369         state->transp = transp;
370         state->req.iov_len = data_len;
371         state->req.iov_base = discard_const_p(void *, data);
372         state->max_rdata_len = max_rdata_len;
373
374         endtime = timeval_current_ofs(0, transp->timeout * 1000);
375
376         subreq = tstream_writev_queue_send(state, ev,
377                                            transp->stream,
378                                            transp->write_queue,
379                                            &state->req, 1);
380         if (tevent_req_nomem(subreq, req)) {
381                 return tevent_req_post(req, ev);
382         }
383         if (!tevent_req_set_endtime(subreq, ev, endtime)) {
384                 return tevent_req_post(req, ev);
385         }
386         tevent_req_set_callback(subreq, rpc_tstream_trans_writev, req);
387
388         if (tstream_is_cli_np(transp->stream)) {
389                 tstream_cli_np_use_trans(transp->stream);
390         }
391
392         subreq = tstream_readv_pdu_queue_send(state, ev,
393                                               transp->stream,
394                                               transp->read_queue,
395                                               rpc_tstream_trans_next_vector,
396                                               state);
397         if (tevent_req_nomem(subreq, req)) {
398                 return tevent_req_post(req, ev);
399         }
400         if (!tevent_req_set_endtime(subreq, ev, endtime)) {
401                 return tevent_req_post(req, ev);
402         }
403         tevent_req_set_callback(subreq, rpc_tstream_trans_readv_pdu, req);
404
405         return req;
406 }
407
408 static void rpc_tstream_trans_writev(struct tevent_req *subreq)
409 {
410         struct tevent_req *req =
411                 tevent_req_callback_data(subreq,
412                 struct tevent_req);
413         struct rpc_tstream_trans_state *state =
414                 tevent_req_data(req,
415                 struct rpc_tstream_trans_state);
416         int ret;
417         int err;
418
419         ret = tstream_writev_queue_recv(subreq, &err);
420         TALLOC_FREE(subreq);
421         if (ret == -1) {
422                 rpc_tstream_disconnect(state->transp);
423                 tevent_req_nterror(req, map_nt_error_from_unix(err));
424                 return;
425         }
426 }
427
428 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
429                                          void *private_data,
430                                          TALLOC_CTX *mem_ctx,
431                                          struct iovec **_vector,
432                                          size_t *count)
433 {
434         struct rpc_tstream_trans_state *state =
435                 talloc_get_type_abort(private_data,
436                 struct rpc_tstream_trans_state);
437         struct iovec *vector;
438
439         if (state->max_rdata_len == state->rep.iov_len) {
440                 *_vector = NULL;
441                 *count = 0;
442                 return 0;
443         }
444
445         state->rep.iov_base = talloc_array(state, uint8_t,
446                                            state->max_rdata_len);
447         if (state->rep.iov_base == NULL) {
448                 return -1;
449         }
450         state->rep.iov_len = state->max_rdata_len;
451
452         vector = talloc_array(mem_ctx, struct iovec, 1);
453         if (!vector) {
454                 return -1;
455         }
456
457         vector[0] = state->rep;
458
459         *_vector = vector;
460         *count = 1;
461         return 0;
462 }
463
464 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq)
465 {
466         struct tevent_req *req =
467                 tevent_req_callback_data(subreq,
468                 struct tevent_req);
469         struct rpc_tstream_trans_state *state =
470                 tevent_req_data(req,
471                 struct rpc_tstream_trans_state);
472         int ret;
473         int err;
474
475         ret = tstream_readv_pdu_queue_recv(subreq, &err);
476         TALLOC_FREE(subreq);
477         if (ret == -1) {
478                 rpc_tstream_disconnect(state->transp);
479                 tevent_req_nterror(req, map_nt_error_from_unix(err));
480                 return;
481         }
482
483         tevent_req_done(req);
484 }
485
486 static NTSTATUS rpc_tstream_trans_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
487                                        uint8_t **prdata, uint32_t *prdata_len)
488 {
489         struct rpc_tstream_trans_state *state =
490                 tevent_req_data(req,
491                 struct rpc_tstream_trans_state);
492         NTSTATUS status;
493
494         if (tevent_req_is_nterror(req, &status)) {
495                 return status;
496         }
497
498         *prdata = (uint8_t *)talloc_move(mem_ctx, &state->rep.iov_base);
499         *prdata_len = state->rep.iov_len;
500         return NT_STATUS_OK;
501 }
502
503 /**
504 * @brief Initialize a tstream transport facility
505 *        NOTE: this function will talloc_steal, the stream and the queues.
506 *
507 * @param mem_ctx        - memory context used to allocate the transport
508 * @param stream         - a ready to use tstream
509 * @param presult        - the transport structure
510 *
511 * @return               - a NT Status error code.
512 */
513 NTSTATUS rpc_transport_tstream_init(TALLOC_CTX *mem_ctx,
514                                 struct tstream_context **stream,
515                                 struct rpc_cli_transport **presult)
516 {
517         struct rpc_cli_transport *result;
518         struct rpc_tstream_state *state;
519
520         result = talloc(mem_ctx, struct rpc_cli_transport);
521         if (result == NULL) {
522                 return NT_STATUS_NO_MEMORY;
523         }
524         state = talloc(result, struct rpc_tstream_state);
525         if (state == NULL) {
526                 TALLOC_FREE(result);
527                 return NT_STATUS_NO_MEMORY;
528         }
529         result->priv = state;
530
531         state->read_queue = tevent_queue_create(state, "read_queue");
532         if (state->read_queue == NULL) {
533                 TALLOC_FREE(result);
534                 return NT_STATUS_NO_MEMORY;
535         }
536         state->write_queue = tevent_queue_create(state, "write_queue");
537         if (state->write_queue == NULL) {
538                 TALLOC_FREE(result);
539                 return NT_STATUS_NO_MEMORY;
540         }
541
542         state->stream = talloc_move(state, stream);
543         state->timeout = 10000; /* 10 seconds. */
544
545         if (tstream_is_cli_np(state->stream)) {
546                 result->trans_send = rpc_tstream_trans_send;
547                 result->trans_recv = rpc_tstream_trans_recv;
548         } else {
549                 result->trans_send = NULL;
550                 result->trans_recv = NULL;
551         }
552         result->write_send = rpc_tstream_write_send;
553         result->write_recv = rpc_tstream_write_recv;
554         result->read_send = rpc_tstream_read_send;
555         result->read_recv = rpc_tstream_read_recv;
556         result->is_connected = rpc_tstream_is_connected;
557         result->set_timeout = rpc_tstream_set_timeout;
558
559         *presult = result;
560         return NT_STATUS_OK;
561 }