2 * Unix SMB/CIFS implementation.
3 * RPC client transport over tstream
4 * Copyright (C) Simo Sorce 2010
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tsocket/tsocket.h"
24 #define DBGC_CLASS DBGC_RPC_CLI
26 struct rpc_tstream_state {
27 struct tstream_context *stream;
28 struct tevent_queue *read_queue;
29 struct tevent_queue *write_queue;
33 static void rpc_tstream_disconnect(struct rpc_tstream_state *s)
35 TALLOC_FREE(s->stream);
38 static bool rpc_tstream_is_connected(void *priv)
40 struct rpc_tstream_state *transp =
41 talloc_get_type_abort(priv, struct rpc_tstream_state);
43 if (!transp->stream) {
50 static unsigned int rpc_tstream_set_timeout(void *priv, unsigned int timeout)
52 struct rpc_tstream_state *transp =
53 talloc_get_type_abort(priv, struct rpc_tstream_state);
57 ok = rpc_tstream_is_connected(transp);
62 orig_timeout = transp->timeout;
64 transp->timeout = timeout;
69 struct rpc_tstream_next_vector_state {
76 static void rpc_tstream_next_vector_init(
77 struct rpc_tstream_next_vector_state *s,
78 uint8_t *buf, size_t len)
83 s->len = MIN(len, UINT16_MAX);
86 static int rpc_tstream_next_vector(struct tstream_context *stream,
89 struct iovec **_vector,
92 struct rpc_tstream_next_vector_state *state =
93 (struct rpc_tstream_next_vector_state *)private_data;
98 if (state->ofs == state->len) {
104 pending = tstream_pending_bytes(stream);
109 if (pending == 0 && state->ofs != 0) {
110 /* return a short read */
117 /* we want at least one byte and recheck again */
120 size_t missing = state->len - state->ofs;
121 if (pending > missing) {
122 /* there's more available */
123 state->remaining = pending - missing;
126 /* read what we can get and recheck in the next cycle */
131 vector = talloc_array(mem_ctx, struct iovec, 1);
136 vector[0].iov_base = state->buf + state->ofs;
137 vector[0].iov_len = wanted;
139 state->ofs += wanted;
146 struct rpc_tstream_read_state {
147 struct rpc_tstream_state *transp;
148 struct rpc_tstream_next_vector_state next_vector;
152 static void rpc_tstream_read_done(struct tevent_req *subreq);
154 static struct tevent_req *rpc_tstream_read_send(TALLOC_CTX *mem_ctx,
155 struct event_context *ev,
156 uint8_t *data, size_t size,
159 struct rpc_tstream_state *transp =
160 talloc_get_type_abort(priv, struct rpc_tstream_state);
161 struct tevent_req *req, *subreq;
162 struct rpc_tstream_read_state *state;
163 struct timeval endtime;
165 req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_read_state);
169 if (!rpc_tstream_is_connected(transp)) {
170 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
171 return tevent_req_post(req, ev);
173 state->transp = transp;
174 rpc_tstream_next_vector_init(&state->next_vector, data, size);
176 subreq = tstream_readv_pdu_queue_send(state, ev,
179 rpc_tstream_next_vector,
180 &state->next_vector);
181 if (subreq == NULL) {
182 tevent_req_nterror(req, NT_STATUS_NO_MEMORY);
183 return tevent_req_post(req, ev);
186 endtime = timeval_current_ofs(0, transp->timeout * 1000);
187 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
191 tevent_req_set_callback(subreq, rpc_tstream_read_done, req);
198 static void rpc_tstream_read_done(struct tevent_req *subreq)
200 struct tevent_req *req =
201 tevent_req_callback_data(subreq, struct tevent_req);
202 struct rpc_tstream_read_state *state =
203 tevent_req_data(req, struct rpc_tstream_read_state);
206 state->nread = tstream_readv_pdu_queue_recv(subreq, &err);
208 if (state->nread < 0) {
209 rpc_tstream_disconnect(state->transp);
210 tevent_req_nterror(req, map_nt_error_from_unix(err));
213 tevent_req_done(req);
216 static NTSTATUS rpc_tstream_read_recv(struct tevent_req *req, ssize_t *size)
218 struct rpc_tstream_read_state *state = tevent_req_data(
219 req, struct rpc_tstream_read_state);
222 if (tevent_req_is_nterror(req, &status)) {
225 *size = state->nread;
229 struct rpc_tstream_write_state {
230 struct event_context *ev;
231 struct rpc_tstream_state *transp;
236 static void rpc_tstream_write_done(struct tevent_req *subreq);
238 static struct tevent_req *rpc_tstream_write_send(TALLOC_CTX *mem_ctx,
239 struct event_context *ev,
240 const uint8_t *data, size_t size,
243 struct rpc_tstream_state *transp =
244 talloc_get_type_abort(priv, struct rpc_tstream_state);
245 struct tevent_req *req, *subreq;
246 struct rpc_tstream_write_state *state;
247 struct timeval endtime;
249 req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_write_state);
253 if (!rpc_tstream_is_connected(transp)) {
254 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
255 return tevent_req_post(req, ev);
258 state->transp = transp;
259 state->iov.iov_base = discard_const_p(void *, data);
260 state->iov.iov_len = size;
262 subreq = tstream_writev_queue_send(state, ev,
266 if (subreq == NULL) {
270 endtime = timeval_current_ofs(0, transp->timeout * 1000);
271 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
275 tevent_req_set_callback(subreq, rpc_tstream_write_done, req);
282 static void rpc_tstream_write_done(struct tevent_req *subreq)
284 struct tevent_req *req =
285 tevent_req_callback_data(subreq, struct tevent_req);
286 struct rpc_tstream_write_state *state =
287 tevent_req_data(req, struct rpc_tstream_write_state);
290 state->nwritten = tstream_writev_queue_recv(subreq, &err);
292 if (state->nwritten < 0) {
293 rpc_tstream_disconnect(state->transp);
294 tevent_req_nterror(req, map_nt_error_from_unix(err));
297 tevent_req_done(req);
300 static NTSTATUS rpc_tstream_write_recv(struct tevent_req *req, ssize_t *sent)
302 struct rpc_tstream_write_state *state =
303 tevent_req_data(req, struct rpc_tstream_write_state);
306 if (tevent_req_is_nterror(req, &status)) {
309 *sent = state->nwritten;
314 * @brief Initialize a tstream transport facility
315 * NOTE: this function will talloc_steal, the stream and the queues.
317 * @param mem_ctx - memory context used to allocate the transport
318 * @param stream - a ready to use tstream
319 * @param presult - the transport structure
321 * @return - a NT Status error code.
323 NTSTATUS rpc_transport_tstream_init(TALLOC_CTX *mem_ctx,
324 struct tstream_context **stream,
325 struct rpc_cli_transport **presult)
327 struct rpc_cli_transport *result;
328 struct rpc_tstream_state *state;
330 result = talloc(mem_ctx, struct rpc_cli_transport);
331 if (result == NULL) {
332 return NT_STATUS_NO_MEMORY;
334 state = talloc(result, struct rpc_tstream_state);
337 return NT_STATUS_NO_MEMORY;
339 result->priv = state;
341 state->read_queue = tevent_queue_create(state, "read_queue");
342 if (state->read_queue == NULL) {
344 return NT_STATUS_NO_MEMORY;
346 state->write_queue = tevent_queue_create(state, "write_queue");
347 if (state->write_queue == NULL) {
349 return NT_STATUS_NO_MEMORY;
352 state->stream = talloc_move(state, stream);
353 state->timeout = 10000; /* 10 seconds. */
355 result->trans_send = NULL;
356 result->trans_recv = NULL;
357 result->write_send = rpc_tstream_write_send;
358 result->write_recv = rpc_tstream_write_recv;
359 result->read_send = rpc_tstream_read_send;
360 result->read_recv = rpc_tstream_read_recv;
361 result->is_connected = rpc_tstream_is_connected;
362 result->set_timeout = rpc_tstream_set_timeout;