s3-rpc_client: Move client pipe functions to own header.
[mat/samba.git] / source3 / rpc_client / rpc_transport_tstream.c
1 /*
2  *  Unix SMB/CIFS implementation.
3  *  RPC client transport over tstream
4  *  Copyright (C) Simo Sorce 2010
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; either version 3 of the License, or
9  *  (at your option) any later version.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program; if not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "includes.h"
21 #include "lib/tsocket/tsocket.h"
22 #include "libsmb/cli_np_tstream.h"
23 #include "cli_pipe.h"
24
25 #undef DBGC_CLASS
26 #define DBGC_CLASS DBGC_RPC_CLI
27
28 struct rpc_tstream_state {
29         struct tstream_context *stream;
30         struct tevent_queue *read_queue;
31         struct tevent_queue *write_queue;
32         unsigned int timeout;
33 };
34
35 static void rpc_tstream_disconnect(struct rpc_tstream_state *s)
36 {
37         TALLOC_FREE(s->stream);
38 }
39
40 static bool rpc_tstream_is_connected(void *priv)
41 {
42         struct rpc_tstream_state *transp =
43                 talloc_get_type_abort(priv, struct rpc_tstream_state);
44         ssize_t ret;
45
46         if (!transp->stream) {
47                 return false;
48         }
49
50         if (!tstream_is_cli_np(transp->stream)) {
51                 return true;
52         }
53
54         ret = tstream_pending_bytes(transp->stream);
55         if (ret == -1) {
56                 return false;
57         }
58
59         return true;
60 }
61
62 static unsigned int rpc_tstream_set_timeout(void *priv, unsigned int timeout)
63 {
64         struct rpc_tstream_state *transp =
65                 talloc_get_type_abort(priv, struct rpc_tstream_state);
66         int orig_timeout;
67         bool ok;
68
69         ok = rpc_tstream_is_connected(transp);
70         if (!ok) {
71                 return 0;
72         }
73
74         if (tstream_is_cli_np(transp->stream)) {
75                 transp->timeout = timeout;
76                 return tstream_cli_np_set_timeout(transp->stream, timeout);
77         }
78
79         orig_timeout = transp->timeout;
80
81         transp->timeout = timeout;
82
83         return orig_timeout;
84 }
85
86 struct rpc_tstream_next_vector_state {
87         uint8_t *buf;
88         size_t len;
89         off_t ofs;
90         size_t remaining;
91 };
92
93 static void rpc_tstream_next_vector_init(
94                                 struct rpc_tstream_next_vector_state *s,
95                                 uint8_t *buf, size_t len)
96 {
97         ZERO_STRUCTP(s);
98
99         s->buf = buf;
100         s->len = MIN(len, UINT16_MAX);
101 }
102
103 static int rpc_tstream_next_vector(struct tstream_context *stream,
104                                    void *private_data,
105                                    TALLOC_CTX *mem_ctx,
106                                    struct iovec **_vector,
107                                    size_t *count)
108 {
109         struct rpc_tstream_next_vector_state *state =
110                 (struct rpc_tstream_next_vector_state *)private_data;
111         struct iovec *vector;
112         ssize_t pending;
113         size_t wanted;
114
115         if (state->ofs == state->len) {
116                 *_vector = NULL;
117                 *count = 0;
118                 return 0;
119         }
120
121         pending = tstream_pending_bytes(stream);
122         if (pending == -1) {
123                 return -1;
124         }
125
126         if (pending == 0 && state->ofs != 0) {
127                 /* return a short read */
128                 *_vector = NULL;
129                 *count = 0;
130                 return 0;
131         }
132
133         if (pending == 0) {
134                 /* we want at least one byte and recheck again */
135                 wanted = 1;
136         } else {
137                 size_t missing = state->len - state->ofs;
138                 if (pending > missing) {
139                         /* there's more available */
140                         state->remaining = pending - missing;
141                         wanted = missing;
142                 } else {
143                         /* read what we can get and recheck in the next cycle */
144                         wanted = pending;
145                 }
146         }
147
148         vector = talloc_array(mem_ctx, struct iovec, 1);
149         if (!vector) {
150                 return -1;
151         }
152
153         vector[0].iov_base = state->buf + state->ofs;
154         vector[0].iov_len = wanted;
155
156         state->ofs += wanted;
157
158         *_vector = vector;
159         *count = 1;
160         return 0;
161 }
162
163 struct rpc_tstream_read_state {
164         struct rpc_tstream_state *transp;
165         struct rpc_tstream_next_vector_state next_vector;
166         ssize_t nread;
167 };
168
169 static void rpc_tstream_read_done(struct tevent_req *subreq);
170
171 static struct tevent_req *rpc_tstream_read_send(TALLOC_CTX *mem_ctx,
172                                              struct event_context *ev,
173                                              uint8_t *data, size_t size,
174                                              void *priv)
175 {
176         struct rpc_tstream_state *transp =
177                 talloc_get_type_abort(priv, struct rpc_tstream_state);
178         struct tevent_req *req, *subreq;
179         struct rpc_tstream_read_state *state;
180         struct timeval endtime;
181
182         req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_read_state);
183         if (req == NULL) {
184                 return NULL;
185         }
186         if (!rpc_tstream_is_connected(transp)) {
187                 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
188                 return tevent_req_post(req, ev);
189         }
190         state->transp = transp;
191         rpc_tstream_next_vector_init(&state->next_vector, data, size);
192
193         subreq = tstream_readv_pdu_queue_send(state, ev,
194                                               transp->stream,
195                                               transp->read_queue,
196                                               rpc_tstream_next_vector,
197                                               &state->next_vector);
198         if (subreq == NULL) {
199                 tevent_req_nterror(req, NT_STATUS_NO_MEMORY);
200                 return tevent_req_post(req, ev);
201         }
202
203         endtime = timeval_current_ofs(0, transp->timeout * 1000);
204         if (!tevent_req_set_endtime(subreq, ev, endtime)) {
205                 goto fail;
206         }
207
208         tevent_req_set_callback(subreq, rpc_tstream_read_done, req);
209         return req;
210  fail:
211         TALLOC_FREE(req);
212         return NULL;
213 }
214
215 static void rpc_tstream_read_done(struct tevent_req *subreq)
216 {
217         struct tevent_req *req =
218                 tevent_req_callback_data(subreq, struct tevent_req);
219         struct rpc_tstream_read_state *state =
220                 tevent_req_data(req, struct rpc_tstream_read_state);
221         int err;
222
223         state->nread = tstream_readv_pdu_queue_recv(subreq, &err);
224         TALLOC_FREE(subreq);
225         if (state->nread < 0) {
226                 rpc_tstream_disconnect(state->transp);
227                 tevent_req_nterror(req, map_nt_error_from_unix(err));
228                 return;
229         }
230         tevent_req_done(req);
231 }
232
233 static NTSTATUS rpc_tstream_read_recv(struct tevent_req *req, ssize_t *size)
234 {
235         struct rpc_tstream_read_state *state = tevent_req_data(
236                 req, struct rpc_tstream_read_state);
237         NTSTATUS status;
238
239         if (tevent_req_is_nterror(req, &status)) {
240                 return status;
241         }
242         *size = state->nread;
243         return NT_STATUS_OK;
244 }
245
246 struct rpc_tstream_write_state {
247         struct event_context *ev;
248         struct rpc_tstream_state *transp;
249         struct iovec iov;
250         ssize_t nwritten;
251 };
252
253 static void rpc_tstream_write_done(struct tevent_req *subreq);
254
255 static struct tevent_req *rpc_tstream_write_send(TALLOC_CTX *mem_ctx,
256                                               struct event_context *ev,
257                                               const uint8_t *data, size_t size,
258                                               void *priv)
259 {
260         struct rpc_tstream_state *transp =
261                 talloc_get_type_abort(priv, struct rpc_tstream_state);
262         struct tevent_req *req, *subreq;
263         struct rpc_tstream_write_state *state;
264         struct timeval endtime;
265
266         req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_write_state);
267         if (req == NULL) {
268                 return NULL;
269         }
270         if (!rpc_tstream_is_connected(transp)) {
271                 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
272                 return tevent_req_post(req, ev);
273         }
274         state->ev = ev;
275         state->transp = transp;
276         state->iov.iov_base = discard_const_p(void *, data);
277         state->iov.iov_len = size;
278
279         subreq = tstream_writev_queue_send(state, ev,
280                                            transp->stream,
281                                            transp->write_queue,
282                                            &state->iov, 1);
283         if (subreq == NULL) {
284                 goto fail;
285         }
286
287         endtime = timeval_current_ofs(0, transp->timeout * 1000);
288         if (!tevent_req_set_endtime(subreq, ev, endtime)) {
289                 goto fail;
290         }
291
292         tevent_req_set_callback(subreq, rpc_tstream_write_done, req);
293         return req;
294  fail:
295         TALLOC_FREE(req);
296         return NULL;
297 }
298
299 static void rpc_tstream_write_done(struct tevent_req *subreq)
300 {
301         struct tevent_req *req =
302                 tevent_req_callback_data(subreq, struct tevent_req);
303         struct rpc_tstream_write_state *state =
304                 tevent_req_data(req, struct rpc_tstream_write_state);
305         int err;
306
307         state->nwritten = tstream_writev_queue_recv(subreq, &err);
308         TALLOC_FREE(subreq);
309         if (state->nwritten < 0) {
310                 rpc_tstream_disconnect(state->transp);
311                 tevent_req_nterror(req, map_nt_error_from_unix(err));
312                 return;
313         }
314         tevent_req_done(req);
315 }
316
317 static NTSTATUS rpc_tstream_write_recv(struct tevent_req *req, ssize_t *sent)
318 {
319         struct rpc_tstream_write_state *state =
320                 tevent_req_data(req, struct rpc_tstream_write_state);
321         NTSTATUS status;
322
323         if (tevent_req_is_nterror(req, &status)) {
324                 return status;
325         }
326         *sent = state->nwritten;
327         return NT_STATUS_OK;
328 }
329
330 struct rpc_tstream_trans_state {
331         struct tevent_context *ev;
332         struct rpc_tstream_state *transp;
333         struct iovec req;
334         uint32_t max_rdata_len;
335         struct iovec rep;
336 };
337
338 static void rpc_tstream_trans_writev(struct tevent_req *subreq);
339 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq);
340
341 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
342                                          void *private_data,
343                                          TALLOC_CTX *mem_ctx,
344                                          struct iovec **_vector,
345                                          size_t *count);
346
347 static struct tevent_req *rpc_tstream_trans_send(TALLOC_CTX *mem_ctx,
348                                                  struct tevent_context *ev,
349                                                  uint8_t *data, size_t data_len,
350                                                  uint32_t max_rdata_len,
351                                                  void *priv)
352 {
353         struct rpc_tstream_state *transp =
354                 talloc_get_type_abort(priv, struct rpc_tstream_state);
355         struct tevent_req *req, *subreq;
356         struct rpc_tstream_trans_state *state;
357         struct timeval endtime;
358
359         req = tevent_req_create(mem_ctx, &state,
360                                 struct rpc_tstream_trans_state);
361         if (req == NULL) {
362                 return NULL;
363         }
364
365         if (!rpc_tstream_is_connected(transp)) {
366                 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
367                 return tevent_req_post(req, ev);
368         }
369         state->ev = ev;
370         state->transp = transp;
371         state->req.iov_len = data_len;
372         state->req.iov_base = discard_const_p(void *, data);
373         state->max_rdata_len = max_rdata_len;
374
375         endtime = timeval_current_ofs(0, transp->timeout * 1000);
376
377         subreq = tstream_writev_queue_send(state, ev,
378                                            transp->stream,
379                                            transp->write_queue,
380                                            &state->req, 1);
381         if (tevent_req_nomem(subreq, req)) {
382                 return tevent_req_post(req, ev);
383         }
384         if (!tevent_req_set_endtime(subreq, ev, endtime)) {
385                 return tevent_req_post(req, ev);
386         }
387         tevent_req_set_callback(subreq, rpc_tstream_trans_writev, req);
388
389         if (tstream_is_cli_np(transp->stream)) {
390                 tstream_cli_np_use_trans(transp->stream);
391         }
392
393         subreq = tstream_readv_pdu_queue_send(state, ev,
394                                               transp->stream,
395                                               transp->read_queue,
396                                               rpc_tstream_trans_next_vector,
397                                               state);
398         if (tevent_req_nomem(subreq, req)) {
399                 return tevent_req_post(req, ev);
400         }
401         if (!tevent_req_set_endtime(subreq, ev, endtime)) {
402                 return tevent_req_post(req, ev);
403         }
404         tevent_req_set_callback(subreq, rpc_tstream_trans_readv_pdu, req);
405
406         return req;
407 }
408
409 static void rpc_tstream_trans_writev(struct tevent_req *subreq)
410 {
411         struct tevent_req *req =
412                 tevent_req_callback_data(subreq,
413                 struct tevent_req);
414         struct rpc_tstream_trans_state *state =
415                 tevent_req_data(req,
416                 struct rpc_tstream_trans_state);
417         int ret;
418         int err;
419
420         ret = tstream_writev_queue_recv(subreq, &err);
421         TALLOC_FREE(subreq);
422         if (ret == -1) {
423                 rpc_tstream_disconnect(state->transp);
424                 tevent_req_nterror(req, map_nt_error_from_unix(err));
425                 return;
426         }
427 }
428
429 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
430                                          void *private_data,
431                                          TALLOC_CTX *mem_ctx,
432                                          struct iovec **_vector,
433                                          size_t *count)
434 {
435         struct rpc_tstream_trans_state *state =
436                 talloc_get_type_abort(private_data,
437                 struct rpc_tstream_trans_state);
438         struct iovec *vector;
439
440         if (state->max_rdata_len == state->rep.iov_len) {
441                 *_vector = NULL;
442                 *count = 0;
443                 return 0;
444         }
445
446         state->rep.iov_base = talloc_array(state, uint8_t,
447                                            state->max_rdata_len);
448         if (state->rep.iov_base == NULL) {
449                 return -1;
450         }
451         state->rep.iov_len = state->max_rdata_len;
452
453         vector = talloc_array(mem_ctx, struct iovec, 1);
454         if (!vector) {
455                 return -1;
456         }
457
458         vector[0] = state->rep;
459
460         *_vector = vector;
461         *count = 1;
462         return 0;
463 }
464
465 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq)
466 {
467         struct tevent_req *req =
468                 tevent_req_callback_data(subreq,
469                 struct tevent_req);
470         struct rpc_tstream_trans_state *state =
471                 tevent_req_data(req,
472                 struct rpc_tstream_trans_state);
473         int ret;
474         int err;
475
476         ret = tstream_readv_pdu_queue_recv(subreq, &err);
477         TALLOC_FREE(subreq);
478         if (ret == -1) {
479                 rpc_tstream_disconnect(state->transp);
480                 tevent_req_nterror(req, map_nt_error_from_unix(err));
481                 return;
482         }
483
484         tevent_req_done(req);
485 }
486
487 static NTSTATUS rpc_tstream_trans_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
488                                        uint8_t **prdata, uint32_t *prdata_len)
489 {
490         struct rpc_tstream_trans_state *state =
491                 tevent_req_data(req,
492                 struct rpc_tstream_trans_state);
493         NTSTATUS status;
494
495         if (tevent_req_is_nterror(req, &status)) {
496                 return status;
497         }
498
499         *prdata = (uint8_t *)talloc_move(mem_ctx, &state->rep.iov_base);
500         *prdata_len = state->rep.iov_len;
501         return NT_STATUS_OK;
502 }
503
504 /**
505 * @brief Initialize a tstream transport facility
506 *        NOTE: this function will talloc_steal, the stream and the queues.
507 *
508 * @param mem_ctx        - memory context used to allocate the transport
509 * @param stream         - a ready to use tstream
510 * @param presult        - the transport structure
511 *
512 * @return               - a NT Status error code.
513 */
514 NTSTATUS rpc_transport_tstream_init(TALLOC_CTX *mem_ctx,
515                                 struct tstream_context **stream,
516                                 struct rpc_cli_transport **presult)
517 {
518         struct rpc_cli_transport *result;
519         struct rpc_tstream_state *state;
520
521         result = talloc(mem_ctx, struct rpc_cli_transport);
522         if (result == NULL) {
523                 return NT_STATUS_NO_MEMORY;
524         }
525         state = talloc(result, struct rpc_tstream_state);
526         if (state == NULL) {
527                 TALLOC_FREE(result);
528                 return NT_STATUS_NO_MEMORY;
529         }
530         result->priv = state;
531
532         state->read_queue = tevent_queue_create(state, "read_queue");
533         if (state->read_queue == NULL) {
534                 TALLOC_FREE(result);
535                 return NT_STATUS_NO_MEMORY;
536         }
537         state->write_queue = tevent_queue_create(state, "write_queue");
538         if (state->write_queue == NULL) {
539                 TALLOC_FREE(result);
540                 return NT_STATUS_NO_MEMORY;
541         }
542
543         state->stream = talloc_move(state, stream);
544         state->timeout = 10000; /* 10 seconds. */
545
546         if (tstream_is_cli_np(state->stream)) {
547                 result->trans_send = rpc_tstream_trans_send;
548                 result->trans_recv = rpc_tstream_trans_recv;
549         } else {
550                 result->trans_send = NULL;
551                 result->trans_recv = NULL;
552         }
553         result->write_send = rpc_tstream_write_send;
554         result->write_recv = rpc_tstream_write_recv;
555         result->read_send = rpc_tstream_read_send;
556         result->read_recv = rpc_tstream_read_recv;
557         result->is_connected = rpc_tstream_is_connected;
558         result->set_timeout = rpc_tstream_set_timeout;
559
560         *presult = result;
561         return NT_STATUS_OK;
562 }
563
564 struct cli_state *rpc_pipe_np_smb_conn(struct rpc_pipe_client *p)
565 {
566         struct rpc_tstream_state *transp =
567                 talloc_get_type_abort(p->transport->priv,
568                 struct rpc_tstream_state);
569         bool ok;
570
571         ok = rpccli_is_connected(p);
572         if (!ok) {
573                 return NULL;
574         }
575
576         if (!tstream_is_cli_np(transp->stream)) {
577                 return NULL;
578         }
579
580         return tstream_cli_np_get_cli_state(transp->stream);
581 }