2 * Unix SMB/CIFS implementation.
3 * RPC client transport over tstream
4 * Copyright (C) Simo Sorce 2010
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tsocket/tsocket.h"
22 #include "libsmb/cli_np_tstream.h"
26 #define DBGC_CLASS DBGC_RPC_CLI
28 struct rpc_tstream_state {
29 struct tstream_context *stream;
30 struct tevent_queue *read_queue;
31 struct tevent_queue *write_queue;
35 static void rpc_tstream_disconnect(struct rpc_tstream_state *s)
37 TALLOC_FREE(s->stream);
40 static bool rpc_tstream_is_connected(void *priv)
42 struct rpc_tstream_state *transp =
43 talloc_get_type_abort(priv, struct rpc_tstream_state);
46 if (!transp->stream) {
50 if (!tstream_is_cli_np(transp->stream)) {
54 ret = tstream_pending_bytes(transp->stream);
62 static unsigned int rpc_tstream_set_timeout(void *priv, unsigned int timeout)
64 struct rpc_tstream_state *transp =
65 talloc_get_type_abort(priv, struct rpc_tstream_state);
69 ok = rpc_tstream_is_connected(transp);
74 if (tstream_is_cli_np(transp->stream)) {
75 transp->timeout = timeout;
76 return tstream_cli_np_set_timeout(transp->stream, timeout);
79 orig_timeout = transp->timeout;
81 transp->timeout = timeout;
86 struct rpc_tstream_next_vector_state {
93 static void rpc_tstream_next_vector_init(
94 struct rpc_tstream_next_vector_state *s,
95 uint8_t *buf, size_t len)
100 s->len = MIN(len, UINT16_MAX);
103 static int rpc_tstream_next_vector(struct tstream_context *stream,
106 struct iovec **_vector,
109 struct rpc_tstream_next_vector_state *state =
110 (struct rpc_tstream_next_vector_state *)private_data;
111 struct iovec *vector;
115 if (state->ofs == state->len) {
121 pending = tstream_pending_bytes(stream);
126 if (pending == 0 && state->ofs != 0) {
127 /* return a short read */
134 /* we want at least one byte and recheck again */
137 size_t missing = state->len - state->ofs;
138 if (pending > missing) {
139 /* there's more available */
140 state->remaining = pending - missing;
143 /* read what we can get and recheck in the next cycle */
148 vector = talloc_array(mem_ctx, struct iovec, 1);
153 vector[0].iov_base = state->buf + state->ofs;
154 vector[0].iov_len = wanted;
156 state->ofs += wanted;
163 struct rpc_tstream_read_state {
164 struct rpc_tstream_state *transp;
165 struct rpc_tstream_next_vector_state next_vector;
169 static void rpc_tstream_read_done(struct tevent_req *subreq);
171 static struct tevent_req *rpc_tstream_read_send(TALLOC_CTX *mem_ctx,
172 struct event_context *ev,
173 uint8_t *data, size_t size,
176 struct rpc_tstream_state *transp =
177 talloc_get_type_abort(priv, struct rpc_tstream_state);
178 struct tevent_req *req, *subreq;
179 struct rpc_tstream_read_state *state;
180 struct timeval endtime;
182 req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_read_state);
186 if (!rpc_tstream_is_connected(transp)) {
187 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
188 return tevent_req_post(req, ev);
190 state->transp = transp;
191 rpc_tstream_next_vector_init(&state->next_vector, data, size);
193 subreq = tstream_readv_pdu_queue_send(state, ev,
196 rpc_tstream_next_vector,
197 &state->next_vector);
198 if (subreq == NULL) {
199 tevent_req_nterror(req, NT_STATUS_NO_MEMORY);
200 return tevent_req_post(req, ev);
203 endtime = timeval_current_ofs(0, transp->timeout * 1000);
204 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
208 tevent_req_set_callback(subreq, rpc_tstream_read_done, req);
215 static void rpc_tstream_read_done(struct tevent_req *subreq)
217 struct tevent_req *req =
218 tevent_req_callback_data(subreq, struct tevent_req);
219 struct rpc_tstream_read_state *state =
220 tevent_req_data(req, struct rpc_tstream_read_state);
223 state->nread = tstream_readv_pdu_queue_recv(subreq, &err);
225 if (state->nread < 0) {
226 rpc_tstream_disconnect(state->transp);
227 tevent_req_nterror(req, map_nt_error_from_unix(err));
230 tevent_req_done(req);
233 static NTSTATUS rpc_tstream_read_recv(struct tevent_req *req, ssize_t *size)
235 struct rpc_tstream_read_state *state = tevent_req_data(
236 req, struct rpc_tstream_read_state);
239 if (tevent_req_is_nterror(req, &status)) {
242 *size = state->nread;
246 struct rpc_tstream_write_state {
247 struct event_context *ev;
248 struct rpc_tstream_state *transp;
253 static void rpc_tstream_write_done(struct tevent_req *subreq);
255 static struct tevent_req *rpc_tstream_write_send(TALLOC_CTX *mem_ctx,
256 struct event_context *ev,
257 const uint8_t *data, size_t size,
260 struct rpc_tstream_state *transp =
261 talloc_get_type_abort(priv, struct rpc_tstream_state);
262 struct tevent_req *req, *subreq;
263 struct rpc_tstream_write_state *state;
264 struct timeval endtime;
266 req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_write_state);
270 if (!rpc_tstream_is_connected(transp)) {
271 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
272 return tevent_req_post(req, ev);
275 state->transp = transp;
276 state->iov.iov_base = discard_const_p(void *, data);
277 state->iov.iov_len = size;
279 subreq = tstream_writev_queue_send(state, ev,
283 if (subreq == NULL) {
287 endtime = timeval_current_ofs(0, transp->timeout * 1000);
288 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
292 tevent_req_set_callback(subreq, rpc_tstream_write_done, req);
299 static void rpc_tstream_write_done(struct tevent_req *subreq)
301 struct tevent_req *req =
302 tevent_req_callback_data(subreq, struct tevent_req);
303 struct rpc_tstream_write_state *state =
304 tevent_req_data(req, struct rpc_tstream_write_state);
307 state->nwritten = tstream_writev_queue_recv(subreq, &err);
309 if (state->nwritten < 0) {
310 rpc_tstream_disconnect(state->transp);
311 tevent_req_nterror(req, map_nt_error_from_unix(err));
314 tevent_req_done(req);
317 static NTSTATUS rpc_tstream_write_recv(struct tevent_req *req, ssize_t *sent)
319 struct rpc_tstream_write_state *state =
320 tevent_req_data(req, struct rpc_tstream_write_state);
323 if (tevent_req_is_nterror(req, &status)) {
326 *sent = state->nwritten;
330 struct rpc_tstream_trans_state {
331 struct tevent_context *ev;
332 struct rpc_tstream_state *transp;
334 uint32_t max_rdata_len;
338 static void rpc_tstream_trans_writev(struct tevent_req *subreq);
339 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq);
341 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
344 struct iovec **_vector,
347 static struct tevent_req *rpc_tstream_trans_send(TALLOC_CTX *mem_ctx,
348 struct tevent_context *ev,
349 uint8_t *data, size_t data_len,
350 uint32_t max_rdata_len,
353 struct rpc_tstream_state *transp =
354 talloc_get_type_abort(priv, struct rpc_tstream_state);
355 struct tevent_req *req, *subreq;
356 struct rpc_tstream_trans_state *state;
357 struct timeval endtime;
359 req = tevent_req_create(mem_ctx, &state,
360 struct rpc_tstream_trans_state);
365 if (!rpc_tstream_is_connected(transp)) {
366 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
367 return tevent_req_post(req, ev);
370 state->transp = transp;
371 state->req.iov_len = data_len;
372 state->req.iov_base = discard_const_p(void *, data);
373 state->max_rdata_len = max_rdata_len;
375 endtime = timeval_current_ofs(0, transp->timeout * 1000);
377 subreq = tstream_writev_queue_send(state, ev,
381 if (tevent_req_nomem(subreq, req)) {
382 return tevent_req_post(req, ev);
384 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
385 return tevent_req_post(req, ev);
387 tevent_req_set_callback(subreq, rpc_tstream_trans_writev, req);
389 if (tstream_is_cli_np(transp->stream)) {
390 tstream_cli_np_use_trans(transp->stream);
393 subreq = tstream_readv_pdu_queue_send(state, ev,
396 rpc_tstream_trans_next_vector,
398 if (tevent_req_nomem(subreq, req)) {
399 return tevent_req_post(req, ev);
401 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
402 return tevent_req_post(req, ev);
404 tevent_req_set_callback(subreq, rpc_tstream_trans_readv_pdu, req);
409 static void rpc_tstream_trans_writev(struct tevent_req *subreq)
411 struct tevent_req *req =
412 tevent_req_callback_data(subreq,
414 struct rpc_tstream_trans_state *state =
416 struct rpc_tstream_trans_state);
420 ret = tstream_writev_queue_recv(subreq, &err);
423 rpc_tstream_disconnect(state->transp);
424 tevent_req_nterror(req, map_nt_error_from_unix(err));
429 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
432 struct iovec **_vector,
435 struct rpc_tstream_trans_state *state =
436 talloc_get_type_abort(private_data,
437 struct rpc_tstream_trans_state);
438 struct iovec *vector;
440 if (state->max_rdata_len == state->rep.iov_len) {
446 state->rep.iov_base = talloc_array(state, uint8_t,
447 state->max_rdata_len);
448 if (state->rep.iov_base == NULL) {
451 state->rep.iov_len = state->max_rdata_len;
453 vector = talloc_array(mem_ctx, struct iovec, 1);
458 vector[0] = state->rep;
465 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq)
467 struct tevent_req *req =
468 tevent_req_callback_data(subreq,
470 struct rpc_tstream_trans_state *state =
472 struct rpc_tstream_trans_state);
476 ret = tstream_readv_pdu_queue_recv(subreq, &err);
479 rpc_tstream_disconnect(state->transp);
480 tevent_req_nterror(req, map_nt_error_from_unix(err));
484 tevent_req_done(req);
487 static NTSTATUS rpc_tstream_trans_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
488 uint8_t **prdata, uint32_t *prdata_len)
490 struct rpc_tstream_trans_state *state =
492 struct rpc_tstream_trans_state);
495 if (tevent_req_is_nterror(req, &status)) {
499 *prdata = (uint8_t *)talloc_move(mem_ctx, &state->rep.iov_base);
500 *prdata_len = state->rep.iov_len;
505 * @brief Initialize a tstream transport facility
506 * NOTE: this function will talloc_steal, the stream and the queues.
508 * @param mem_ctx - memory context used to allocate the transport
509 * @param stream - a ready to use tstream
510 * @param presult - the transport structure
512 * @return - a NT Status error code.
514 NTSTATUS rpc_transport_tstream_init(TALLOC_CTX *mem_ctx,
515 struct tstream_context **stream,
516 struct rpc_cli_transport **presult)
518 struct rpc_cli_transport *result;
519 struct rpc_tstream_state *state;
521 result = talloc(mem_ctx, struct rpc_cli_transport);
522 if (result == NULL) {
523 return NT_STATUS_NO_MEMORY;
525 state = talloc(result, struct rpc_tstream_state);
528 return NT_STATUS_NO_MEMORY;
530 result->priv = state;
532 state->read_queue = tevent_queue_create(state, "read_queue");
533 if (state->read_queue == NULL) {
535 return NT_STATUS_NO_MEMORY;
537 state->write_queue = tevent_queue_create(state, "write_queue");
538 if (state->write_queue == NULL) {
540 return NT_STATUS_NO_MEMORY;
543 state->stream = talloc_move(state, stream);
544 state->timeout = 10000; /* 10 seconds. */
546 if (tstream_is_cli_np(state->stream)) {
547 result->trans_send = rpc_tstream_trans_send;
548 result->trans_recv = rpc_tstream_trans_recv;
550 result->trans_send = NULL;
551 result->trans_recv = NULL;
553 result->write_send = rpc_tstream_write_send;
554 result->write_recv = rpc_tstream_write_recv;
555 result->read_send = rpc_tstream_read_send;
556 result->read_recv = rpc_tstream_read_recv;
557 result->is_connected = rpc_tstream_is_connected;
558 result->set_timeout = rpc_tstream_set_timeout;
564 struct cli_state *rpc_pipe_np_smb_conn(struct rpc_pipe_client *p)
566 struct rpc_tstream_state *transp =
567 talloc_get_type_abort(p->transport->priv,
568 struct rpc_tstream_state);
571 ok = rpccli_is_connected(p);
576 if (!tstream_is_cli_np(transp->stream)) {
580 return tstream_cli_np_get_cli_state(transp->stream);