s3-rpc_client: Make data pointer const in trans_send().
[nivanova/samba-autobuild/.git] / source3 / rpc_client / rpc_transport_tstream.c
1 /*
2  *  Unix SMB/CIFS implementation.
3  *  RPC client transport over tstream
4  *  Copyright (C) Simo Sorce 2010
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; either version 3 of the License, or
9  *  (at your option) any later version.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program; if not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "includes.h"
21 #include "../lib/util/tevent_ntstatus.h"
22 #include "rpc_client/rpc_transport.h"
23 #include "lib/tsocket/tsocket.h"
24 #include "libsmb/cli_np_tstream.h"
25 #include "cli_pipe.h"
26
27 #undef DBGC_CLASS
28 #define DBGC_CLASS DBGC_RPC_CLI
29
30 struct rpc_tstream_state {
31         struct tstream_context *stream;
32         struct tevent_queue *read_queue;
33         struct tevent_queue *write_queue;
34         unsigned int timeout;
35 };
36
37 static void rpc_tstream_disconnect(struct rpc_tstream_state *s)
38 {
39         TALLOC_FREE(s->stream);
40 }
41
42 static bool rpc_tstream_is_connected(void *priv)
43 {
44         struct rpc_tstream_state *transp =
45                 talloc_get_type_abort(priv, struct rpc_tstream_state);
46         ssize_t ret;
47
48         if (!transp->stream) {
49                 return false;
50         }
51
52         if (!tstream_is_cli_np(transp->stream)) {
53                 return true;
54         }
55
56         ret = tstream_pending_bytes(transp->stream);
57         if (ret == -1) {
58                 return false;
59         }
60
61         return true;
62 }
63
64 static unsigned int rpc_tstream_set_timeout(void *priv, unsigned int timeout)
65 {
66         struct rpc_tstream_state *transp =
67                 talloc_get_type_abort(priv, struct rpc_tstream_state);
68         int orig_timeout;
69         bool ok;
70
71         ok = rpc_tstream_is_connected(transp);
72         if (!ok) {
73                 return 0;
74         }
75
76         if (tstream_is_cli_np(transp->stream)) {
77                 transp->timeout = timeout;
78                 return tstream_cli_np_set_timeout(transp->stream, timeout);
79         }
80
81         orig_timeout = transp->timeout;
82
83         transp->timeout = timeout;
84
85         return orig_timeout;
86 }
87
88 struct rpc_tstream_next_vector_state {
89         uint8_t *buf;
90         size_t len;
91         off_t ofs;
92         size_t remaining;
93 };
94
95 static void rpc_tstream_next_vector_init(
96                                 struct rpc_tstream_next_vector_state *s,
97                                 uint8_t *buf, size_t len)
98 {
99         ZERO_STRUCTP(s);
100
101         s->buf = buf;
102         s->len = MIN(len, UINT16_MAX);
103 }
104
105 static int rpc_tstream_next_vector(struct tstream_context *stream,
106                                    void *private_data,
107                                    TALLOC_CTX *mem_ctx,
108                                    struct iovec **_vector,
109                                    size_t *count)
110 {
111         struct rpc_tstream_next_vector_state *state =
112                 (struct rpc_tstream_next_vector_state *)private_data;
113         struct iovec *vector;
114         ssize_t pending;
115         size_t wanted;
116
117         if (state->ofs == state->len) {
118                 *_vector = NULL;
119                 *count = 0;
120                 return 0;
121         }
122
123         pending = tstream_pending_bytes(stream);
124         if (pending == -1) {
125                 return -1;
126         }
127
128         if (pending == 0 && state->ofs != 0) {
129                 /* return a short read */
130                 *_vector = NULL;
131                 *count = 0;
132                 return 0;
133         }
134
135         if (pending == 0) {
136                 /* we want at least one byte and recheck again */
137                 wanted = 1;
138         } else {
139                 size_t missing = state->len - state->ofs;
140                 if (pending > missing) {
141                         /* there's more available */
142                         state->remaining = pending - missing;
143                         wanted = missing;
144                 } else {
145                         /* read what we can get and recheck in the next cycle */
146                         wanted = pending;
147                 }
148         }
149
150         vector = talloc_array(mem_ctx, struct iovec, 1);
151         if (!vector) {
152                 return -1;
153         }
154
155         vector[0].iov_base = state->buf + state->ofs;
156         vector[0].iov_len = wanted;
157
158         state->ofs += wanted;
159
160         *_vector = vector;
161         *count = 1;
162         return 0;
163 }
164
165 struct rpc_tstream_read_state {
166         struct rpc_tstream_state *transp;
167         struct rpc_tstream_next_vector_state next_vector;
168         ssize_t nread;
169 };
170
171 static void rpc_tstream_read_done(struct tevent_req *subreq);
172
173 static struct tevent_req *rpc_tstream_read_send(TALLOC_CTX *mem_ctx,
174                                              struct tevent_context *ev,
175                                              uint8_t *data, size_t size,
176                                              void *priv)
177 {
178         struct rpc_tstream_state *transp =
179                 talloc_get_type_abort(priv, struct rpc_tstream_state);
180         struct tevent_req *req, *subreq;
181         struct rpc_tstream_read_state *state;
182         struct timeval endtime;
183
184         req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_read_state);
185         if (req == NULL) {
186                 return NULL;
187         }
188         if (!rpc_tstream_is_connected(transp)) {
189                 NTSTATUS status = NT_STATUS_CONNECTION_DISCONNECTED;
190                 if (tstream_is_cli_np(transp->stream)) {
191                         status = NT_STATUS_PIPE_DISCONNECTED;
192                 }
193                 tevent_req_nterror(req, status);
194                 return tevent_req_post(req, ev);
195         }
196         state->transp = transp;
197         rpc_tstream_next_vector_init(&state->next_vector, data, size);
198
199         subreq = tstream_readv_pdu_queue_send(state, ev,
200                                               transp->stream,
201                                               transp->read_queue,
202                                               rpc_tstream_next_vector,
203                                               &state->next_vector);
204         if (subreq == NULL) {
205                 tevent_req_nterror(req, NT_STATUS_NO_MEMORY);
206                 return tevent_req_post(req, ev);
207         }
208
209         endtime = timeval_current_ofs_msec(transp->timeout);
210         if (!tevent_req_set_endtime(subreq, ev, endtime)) {
211                 goto fail;
212         }
213
214         tevent_req_set_callback(subreq, rpc_tstream_read_done, req);
215         return req;
216  fail:
217         TALLOC_FREE(req);
218         return NULL;
219 }
220
221 static void rpc_tstream_read_done(struct tevent_req *subreq)
222 {
223         struct tevent_req *req =
224                 tevent_req_callback_data(subreq, struct tevent_req);
225         struct rpc_tstream_read_state *state =
226                 tevent_req_data(req, struct rpc_tstream_read_state);
227         int err;
228
229         state->nread = tstream_readv_pdu_queue_recv(subreq, &err);
230         TALLOC_FREE(subreq);
231         if (state->nread < 0) {
232                 rpc_tstream_disconnect(state->transp);
233                 tevent_req_nterror(req, map_nt_error_from_unix(err));
234                 return;
235         }
236         tevent_req_done(req);
237 }
238
239 static NTSTATUS rpc_tstream_read_recv(struct tevent_req *req, ssize_t *size)
240 {
241         struct rpc_tstream_read_state *state = tevent_req_data(
242                 req, struct rpc_tstream_read_state);
243         NTSTATUS status;
244
245         if (tevent_req_is_nterror(req, &status)) {
246                 return status;
247         }
248         *size = state->nread;
249         return NT_STATUS_OK;
250 }
251
252 struct rpc_tstream_write_state {
253         struct tevent_context *ev;
254         struct rpc_tstream_state *transp;
255         struct iovec iov;
256         ssize_t nwritten;
257 };
258
259 static void rpc_tstream_write_done(struct tevent_req *subreq);
260
261 static struct tevent_req *rpc_tstream_write_send(TALLOC_CTX *mem_ctx,
262                                               struct tevent_context *ev,
263                                               const uint8_t *data, size_t size,
264                                               void *priv)
265 {
266         struct rpc_tstream_state *transp =
267                 talloc_get_type_abort(priv, struct rpc_tstream_state);
268         struct tevent_req *req, *subreq;
269         struct rpc_tstream_write_state *state;
270         struct timeval endtime;
271
272         req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_write_state);
273         if (req == NULL) {
274                 return NULL;
275         }
276         if (!rpc_tstream_is_connected(transp)) {
277                 NTSTATUS status = NT_STATUS_CONNECTION_DISCONNECTED;
278                 if (tstream_is_cli_np(transp->stream)) {
279                         status = NT_STATUS_PIPE_DISCONNECTED;
280                 }
281                 tevent_req_nterror(req, status);
282                 return tevent_req_post(req, ev);
283         }
284         state->ev = ev;
285         state->transp = transp;
286         state->iov.iov_base = discard_const_p(void *, data);
287         state->iov.iov_len = size;
288
289         subreq = tstream_writev_queue_send(state, ev,
290                                            transp->stream,
291                                            transp->write_queue,
292                                            &state->iov, 1);
293         if (subreq == NULL) {
294                 goto fail;
295         }
296
297         endtime = timeval_current_ofs_msec(transp->timeout);
298         if (!tevent_req_set_endtime(subreq, ev, endtime)) {
299                 goto fail;
300         }
301
302         tevent_req_set_callback(subreq, rpc_tstream_write_done, req);
303         return req;
304  fail:
305         TALLOC_FREE(req);
306         return NULL;
307 }
308
309 static void rpc_tstream_write_done(struct tevent_req *subreq)
310 {
311         struct tevent_req *req =
312                 tevent_req_callback_data(subreq, struct tevent_req);
313         struct rpc_tstream_write_state *state =
314                 tevent_req_data(req, struct rpc_tstream_write_state);
315         int err;
316
317         state->nwritten = tstream_writev_queue_recv(subreq, &err);
318         TALLOC_FREE(subreq);
319         if (state->nwritten < 0) {
320                 rpc_tstream_disconnect(state->transp);
321                 tevent_req_nterror(req, map_nt_error_from_unix(err));
322                 return;
323         }
324         tevent_req_done(req);
325 }
326
327 static NTSTATUS rpc_tstream_write_recv(struct tevent_req *req, ssize_t *sent)
328 {
329         struct rpc_tstream_write_state *state =
330                 tevent_req_data(req, struct rpc_tstream_write_state);
331         NTSTATUS status;
332
333         if (tevent_req_is_nterror(req, &status)) {
334                 return status;
335         }
336         *sent = state->nwritten;
337         return NT_STATUS_OK;
338 }
339
340 struct rpc_tstream_trans_state {
341         struct tevent_context *ev;
342         struct rpc_tstream_state *transp;
343         struct iovec req;
344         uint32_t max_rdata_len;
345         struct iovec rep;
346 };
347
348 static void rpc_tstream_trans_writev(struct tevent_req *subreq);
349 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq);
350
351 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
352                                          void *private_data,
353                                          TALLOC_CTX *mem_ctx,
354                                          struct iovec **_vector,
355                                          size_t *count);
356
357 static struct tevent_req *rpc_tstream_trans_send(TALLOC_CTX *mem_ctx,
358                                                  struct tevent_context *ev,
359                                                  const uint8_t *data, size_t data_len,
360                                                  uint32_t max_rdata_len,
361                                                  void *priv)
362 {
363         struct rpc_tstream_state *transp =
364                 talloc_get_type_abort(priv, struct rpc_tstream_state);
365         struct tevent_req *req, *subreq;
366         struct rpc_tstream_trans_state *state;
367         struct timeval endtime;
368         bool use_trans = false;
369
370         req = tevent_req_create(mem_ctx, &state,
371                                 struct rpc_tstream_trans_state);
372         if (req == NULL) {
373                 return NULL;
374         }
375
376         if (!rpc_tstream_is_connected(transp)) {
377                 NTSTATUS status = NT_STATUS_CONNECTION_DISCONNECTED;
378                 if (tstream_is_cli_np(transp->stream)) {
379                         status = NT_STATUS_PIPE_DISCONNECTED;
380                 }
381                 tevent_req_nterror(req, status);
382                 return tevent_req_post(req, ev);
383         }
384         state->ev = ev;
385         state->transp = transp;
386         state->req.iov_len = data_len;
387         state->req.iov_base = discard_const_p(void *, data);
388         state->max_rdata_len = max_rdata_len;
389
390         endtime = timeval_current_ofs_msec(transp->timeout);
391
392         if (tstream_is_cli_np(transp->stream)) {
393                 use_trans = true;
394         }
395         if (tevent_queue_length(transp->write_queue) > 0) {
396                 use_trans = false;
397         }
398         if (tevent_queue_length(transp->read_queue) > 0) {
399                 use_trans = false;
400         }
401
402         if (use_trans) {
403                 tstream_cli_np_use_trans(transp->stream);
404         }
405
406         subreq = tstream_writev_queue_send(state, ev,
407                                            transp->stream,
408                                            transp->write_queue,
409                                            &state->req, 1);
410         if (tevent_req_nomem(subreq, req)) {
411                 return tevent_req_post(req, ev);
412         }
413         if (!tevent_req_set_endtime(subreq, ev, endtime)) {
414                 return tevent_req_post(req, ev);
415         }
416         tevent_req_set_callback(subreq, rpc_tstream_trans_writev, req);
417
418         subreq = tstream_readv_pdu_queue_send(state, ev,
419                                               transp->stream,
420                                               transp->read_queue,
421                                               rpc_tstream_trans_next_vector,
422                                               state);
423         if (tevent_req_nomem(subreq, req)) {
424                 return tevent_req_post(req, ev);
425         }
426         if (!tevent_req_set_endtime(subreq, ev, endtime)) {
427                 return tevent_req_post(req, ev);
428         }
429         tevent_req_set_callback(subreq, rpc_tstream_trans_readv_pdu, req);
430
431         return req;
432 }
433
434 static void rpc_tstream_trans_writev(struct tevent_req *subreq)
435 {
436         struct tevent_req *req =
437                 tevent_req_callback_data(subreq,
438                 struct tevent_req);
439         struct rpc_tstream_trans_state *state =
440                 tevent_req_data(req,
441                 struct rpc_tstream_trans_state);
442         int ret;
443         int err;
444
445         ret = tstream_writev_queue_recv(subreq, &err);
446         TALLOC_FREE(subreq);
447         if (ret == -1) {
448                 rpc_tstream_disconnect(state->transp);
449                 tevent_req_nterror(req, map_nt_error_from_unix(err));
450                 return;
451         }
452 }
453
454 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
455                                          void *private_data,
456                                          TALLOC_CTX *mem_ctx,
457                                          struct iovec **_vector,
458                                          size_t *count)
459 {
460         struct rpc_tstream_trans_state *state =
461                 talloc_get_type_abort(private_data,
462                 struct rpc_tstream_trans_state);
463         struct iovec *vector;
464
465         if (state->max_rdata_len == state->rep.iov_len) {
466                 *_vector = NULL;
467                 *count = 0;
468                 return 0;
469         }
470
471         state->rep.iov_base = talloc_array(state, uint8_t,
472                                            state->max_rdata_len);
473         if (state->rep.iov_base == NULL) {
474                 return -1;
475         }
476         state->rep.iov_len = state->max_rdata_len;
477
478         vector = talloc_array(mem_ctx, struct iovec, 1);
479         if (!vector) {
480                 return -1;
481         }
482
483         vector[0] = state->rep;
484
485         *_vector = vector;
486         *count = 1;
487         return 0;
488 }
489
490 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq)
491 {
492         struct tevent_req *req =
493                 tevent_req_callback_data(subreq,
494                 struct tevent_req);
495         struct rpc_tstream_trans_state *state =
496                 tevent_req_data(req,
497                 struct rpc_tstream_trans_state);
498         int ret;
499         int err;
500
501         ret = tstream_readv_pdu_queue_recv(subreq, &err);
502         TALLOC_FREE(subreq);
503         if (ret == -1) {
504                 rpc_tstream_disconnect(state->transp);
505                 tevent_req_nterror(req, map_nt_error_from_unix(err));
506                 return;
507         }
508
509         tevent_req_done(req);
510 }
511
512 static NTSTATUS rpc_tstream_trans_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
513                                        uint8_t **prdata, uint32_t *prdata_len)
514 {
515         struct rpc_tstream_trans_state *state =
516                 tevent_req_data(req,
517                 struct rpc_tstream_trans_state);
518         NTSTATUS status;
519
520         if (tevent_req_is_nterror(req, &status)) {
521                 return status;
522         }
523
524         *prdata = (uint8_t *)talloc_move(mem_ctx, &state->rep.iov_base);
525         *prdata_len = state->rep.iov_len;
526         return NT_STATUS_OK;
527 }
528
529 /**
530 * @brief Initialize a tstream transport facility
531 *        NOTE: this function will talloc_steal, the stream and the queues.
532 *
533 * @param mem_ctx        - memory context used to allocate the transport
534 * @param stream         - a ready to use tstream
535 * @param presult        - the transport structure
536 *
537 * @return               - a NT Status error code.
538 */
539 NTSTATUS rpc_transport_tstream_init(TALLOC_CTX *mem_ctx,
540                                 struct tstream_context **stream,
541                                 struct rpc_cli_transport **presult)
542 {
543         struct rpc_cli_transport *result;
544         struct rpc_tstream_state *state;
545
546         result = talloc(mem_ctx, struct rpc_cli_transport);
547         if (result == NULL) {
548                 return NT_STATUS_NO_MEMORY;
549         }
550         state = talloc(result, struct rpc_tstream_state);
551         if (state == NULL) {
552                 TALLOC_FREE(result);
553                 return NT_STATUS_NO_MEMORY;
554         }
555         result->priv = state;
556
557         state->read_queue = tevent_queue_create(state, "read_queue");
558         if (state->read_queue == NULL) {
559                 TALLOC_FREE(result);
560                 return NT_STATUS_NO_MEMORY;
561         }
562         state->write_queue = tevent_queue_create(state, "write_queue");
563         if (state->write_queue == NULL) {
564                 TALLOC_FREE(result);
565                 return NT_STATUS_NO_MEMORY;
566         }
567
568         state->stream = talloc_move(state, stream);
569         state->timeout = 10000; /* 10 seconds. */
570
571         if (tstream_is_cli_np(state->stream)) {
572                 result->trans_send = rpc_tstream_trans_send;
573                 result->trans_recv = rpc_tstream_trans_recv;
574         } else {
575                 result->trans_send = NULL;
576                 result->trans_recv = NULL;
577         }
578         result->write_send = rpc_tstream_write_send;
579         result->write_recv = rpc_tstream_write_recv;
580         result->read_send = rpc_tstream_read_send;
581         result->read_recv = rpc_tstream_read_recv;
582         result->is_connected = rpc_tstream_is_connected;
583         result->set_timeout = rpc_tstream_set_timeout;
584
585         *presult = result;
586         return NT_STATUS_OK;
587 }