From: Volker Lendecke Date: Thu, 28 Feb 2008 14:21:33 +0000 (+0100) Subject: Add async cli_pull support X-Git-Url: http://git.samba.org/?a=commitdiff_plain;h=62445788350b2f174537d877d721d92147f93ec7;p=abartlet%2Fsamba.git%2F.git Add async cli_pull support This is the big (and potentially controversial) one. It took a phone call to explain to metze what is going on inside cli_pull_read_done, but I would really like everybody to understand this function. It is a very good and reasonably complex example of async programming. If we want more asynchronism in s3, this is what we will have to deal with :-) Make use of it in the smbclient "get" command. Volker (This used to be commit 76f9b360ee1d973630d82d401eeddce858189301) --- diff --git a/source3/client/client.c b/source3/client/client.c index f7ed33ad8ab..c97f6223a78 100644 --- a/source3/client/client.c +++ b/source3/client/client.c @@ -964,22 +964,30 @@ static int cmd_echo(void) Get a file from rname to lname ****************************************************************************/ +static NTSTATUS writefile_sink(char *buf, size_t n, void *priv) +{ + int *pfd = (int *)priv; + if (writefile(*pfd, buf, n) == -1) { + return map_nt_error_from_unix(errno); + } + return NT_STATUS_OK; +} + static int do_get(const char *rname, const char *lname_in, bool reget) { TALLOC_CTX *ctx = talloc_tos(); int handle = 0, fnum; bool newhandle = false; - char *data = NULL; struct timeval tp_start; - int read_size = io_bufsize; uint16 attr; SMB_OFF_T size; off_t start = 0; - off_t nread = 0; + ssize_t nread = 0; int rc = 0; struct cli_state *targetcli = NULL; char *targetname = NULL; char *lname = NULL; + NTSTATUS status; lname = talloc_strdup(ctx, lname_in); if (!lname) { @@ -1038,36 +1046,15 @@ static int do_get(const char *rname, const char *lname_in, bool reget) DEBUG(1,("getting file %s of size %.0f as %s ", rname, (double)size, lname)); - if(!(data = (char *)SMB_MALLOC(read_size))) { - d_printf("malloc fail for size %d\n", read_size); + status = cli_pull(targetcli, fnum, start, size, 1024*1024, + writefile_sink, (void *)&handle, &nread); + if (!NT_STATUS_IS_OK(status)) { + d_fprintf(stderr, "parallel_read returned %s\n", + nt_errstr(status)); cli_close(targetcli, fnum); return 1; } - while (1) { - int n = cli_read(targetcli, fnum, data, nread + start, read_size); - - if (n <= 0) - break; - - if (writefile(handle,data, n) != n) { - d_printf("Error writing local file\n"); - rc = 1; - break; - } - - nread += n; - } - - if (nread + start < size) { - DEBUG (0, ("Short read when getting file %s. Only got %ld bytes.\n", - rname, (long)nread)); - - rc = 1; - } - - SAFE_FREE(data); - if (!cli_close(targetcli, fnum)) { d_printf("Error %s closing remote file\n",cli_errstr(cli)); rc = 1; diff --git a/source3/libsmb/clireadwrite.c b/source3/libsmb/clireadwrite.c index af13ed8f733..5aee8f18bdb 100644 --- a/source3/libsmb/clireadwrite.c +++ b/source3/libsmb/clireadwrite.c @@ -19,6 +19,431 @@ #include "includes.h" +/**************************************************************************** + Calculate the recommended read buffer size +****************************************************************************/ +static size_t cli_read_max_bufsize(struct cli_state *cli) +{ + if (!client_is_signing_on(cli) && !cli_encryption_on(cli) == false + && (cli->posix_capabilities & CIFS_UNIX_LARGE_READ_CAP)) { + return CLI_SAMBA_MAX_POSIX_LARGE_READX_SIZE; + } + if (cli->capabilities & CAP_LARGE_READX) { + return cli->is_samba + ? CLI_SAMBA_MAX_LARGE_READX_SIZE + : CLI_WINDOWS_MAX_LARGE_READX_SIZE; + } + return (cli->max_xmit - (smb_size+32)) & ~1023; +} + +/* + * Send a read&x request + */ + +struct async_req *cli_read_andx_send(TALLOC_CTX *mem_ctx, + struct cli_state *cli, int fnum, + off_t offset, size_t size) +{ + struct async_req *result; + struct cli_request *req; + bool bigoffset = False; + char *enc_buf; + + if (size > cli_read_max_bufsize(cli)) { + DEBUG(0, ("cli_read_andx_send got size=%d, can only handle " + "size=%d\n", (int)size, + (int)cli_read_max_bufsize(cli))); + return NULL; + } + + result = cli_request_new(mem_ctx, cli->event_ctx, cli, 12, 0, &req); + if (result == NULL) { + DEBUG(0, ("cli_request_new failed\n")); + return NULL; + } + + req = cli_request_get(result); + + req->data.read.ofs = offset; + req->data.read.size = size; + req->data.read.received = 0; + req->data.read.rcvbuf = NULL; + + if ((SMB_BIG_UINT)offset >> 32) + bigoffset = True; + + cli_set_message(req->outbuf, bigoffset ? 12 : 10, 0, False); + + SCVAL(req->outbuf,smb_com,SMBreadX); + SSVAL(req->outbuf,smb_tid,cli->cnum); + cli_setup_packet_buf(cli, req->outbuf); + + SCVAL(req->outbuf,smb_vwv0,0xFF); + SCVAL(req->outbuf,smb_vwv0+1,0); + SSVAL(req->outbuf,smb_vwv1,0); + SSVAL(req->outbuf,smb_vwv2,fnum); + SIVAL(req->outbuf,smb_vwv3,offset); + SSVAL(req->outbuf,smb_vwv5,size); + SSVAL(req->outbuf,smb_vwv6,size); + SSVAL(req->outbuf,smb_vwv7,(size >> 16)); + SSVAL(req->outbuf,smb_vwv8,0); + SSVAL(req->outbuf,smb_vwv9,0); + SSVAL(req->outbuf,smb_mid,req->mid); + + if (bigoffset) { + SIVAL(req->outbuf, smb_vwv10, + (((SMB_BIG_UINT)offset)>>32) & 0xffffffff); + } + + cli_calculate_sign_mac(cli, req->outbuf); + + event_fd_set_writeable(cli->fd_event); + + if (cli_encryption_on(cli)) { + NTSTATUS status; + status = cli_encrypt_message(cli, req->outbuf, &enc_buf); + if (!NT_STATUS_IS_OK(status)) { + DEBUG(0, ("Error in encrypting client message. " + "Error %s\n", nt_errstr(status))); + TALLOC_FREE(req); + return NULL; + } + req->outbuf = enc_buf; + req->enc_state = cli->trans_enc_state; + } + + return result; +} + +/* + * Pull the data out of a finished async read_and_x request. rcvbuf is + * talloced from the request, so better make sure that you copy it away before + * you talloc_free(req). "rcvbuf" is NOT a talloc_ctx of its own, so do not + * talloc_move it! + */ + +NTSTATUS cli_read_andx_recv(struct async_req *req, ssize_t *received, + uint8_t **rcvbuf) +{ + struct cli_request *cli_req = cli_request_get(req); + NTSTATUS status; + size_t size; + + SMB_ASSERT(req->state >= ASYNC_REQ_DONE); + if (req->state == ASYNC_REQ_ERROR) { + return req->status; + } + + status = cli_pull_error(cli_req->inbuf); + + if (!NT_STATUS_IS_OK(status)) { + return status; + } + + /* size is the number of bytes the server returned. + * Might be zero. */ + size = SVAL(cli_req->inbuf, smb_vwv5); + size |= (((unsigned int)(SVAL(cli_req->inbuf, smb_vwv7))) << 16); + + if (size > cli_req->data.read.size) { + DEBUG(5,("server returned more than we wanted!\n")); + return NT_STATUS_UNEXPECTED_IO_ERROR; + } + + if (size < 0) { + DEBUG(5,("read return < 0!\n")); + return NT_STATUS_UNEXPECTED_IO_ERROR; + } + + *rcvbuf = (uint8_t *) + (smb_base(cli_req->inbuf) + SVAL(cli_req->inbuf, smb_vwv6)); + *received = size; + return NT_STATUS_OK; +} + +/* + * Parallel read support. + * + * cli_pull sends as many read&x requests as the server would allow via + * max_mux at a time. When replies flow back in, the data is written into + * the callback function "sink" in the right order. + */ + +struct cli_pull_state { + struct async_req *req; + + struct cli_state *cli; + uint16_t fnum; + off_t start_offset; + size_t size; + + NTSTATUS (*sink)(char *buf, size_t n, void *priv); + void *priv; + + size_t chunk_size; + + /* + * Outstanding requests + */ + int num_reqs; + struct async_req **reqs; + + /* + * For how many bytes did we send requests already? + */ + off_t requested; + + /* + * Next request index to push into "sink". This walks around the "req" + * array, taking care that the requests are pushed to "sink" in the + * right order. If necessary (i.e. replies don't come in in the right + * order), replies are held back in "reqs". + */ + int top_req; + + /* + * How many bytes did we push into "sink"? + */ + + off_t pushed; +}; + +static char *cli_pull_print(TALLOC_CTX *mem_ctx, struct async_req *req) +{ + struct cli_pull_state *state = talloc_get_type_abort( + req->private_data, struct cli_pull_state); + char *result; + + result = async_req_print(mem_ctx, req); + if (result == NULL) { + return NULL; + } + + return talloc_asprintf_append_buffer( + result, "num_reqs=%d, top_req=%d", + state->num_reqs, state->top_req); +} + +static void cli_pull_read_done(struct async_req *read_req); + +/* + * Prepare an async pull request + */ + +struct async_req *cli_pull_send(TALLOC_CTX *mem_ctx, struct cli_state *cli, + uint16_t fnum, off_t start_offset, + size_t size, size_t window_size, + NTSTATUS (*sink)(char *buf, size_t n, + void *priv), + void *priv) +{ + struct async_req *result; + struct cli_pull_state *state; + int i; + + result = async_req_new(mem_ctx, cli->event_ctx); + if (result == NULL) { + goto failed; + } + state = talloc(result, struct cli_pull_state); + if (state == NULL) { + goto failed; + } + result->private_data = state; + result->print = cli_pull_print; + state->req = result; + + state->cli = cli; + state->fnum = fnum; + state->start_offset = start_offset; + state->size = size; + state->sink = sink; + state->priv = priv; + + state->pushed = 0; + state->top_req = 0; + state->chunk_size = cli_read_max_bufsize(cli); + + state->num_reqs = MAX(window_size/state->chunk_size, 1); + state->num_reqs = MIN(state->num_reqs, cli->max_mux); + + state->reqs = TALLOC_ZERO_ARRAY(state, struct async_req *, + state->num_reqs); + if (state->reqs == NULL) { + goto failed; + } + + state->requested = 0; + + for (i=0; inum_reqs; i++) { + size_t size_left, request_thistime; + + if (state->requested >= size) { + state->num_reqs = i; + break; + } + + size_left = size - state->requested; + request_thistime = MIN(size_left, state->chunk_size); + + state->reqs[i] = cli_read_andx_send( + state->reqs, cli, fnum, + state->start_offset + state->requested, + request_thistime); + + if (state->reqs[i] == NULL) { + goto failed; + } + + state->reqs[i]->async.fn = cli_pull_read_done; + state->reqs[i]->async.priv = result; + + state->requested += request_thistime; + } + return result; + +failed: + TALLOC_FREE(result); + return NULL; +} + +/* + * Handle incoming read replies, push the data into sink and send out new + * requests if necessary. + */ + +static void cli_pull_read_done(struct async_req *read_req) +{ + struct async_req *pull_req = talloc_get_type_abort( + read_req->async.priv, struct async_req); + struct cli_pull_state *state = talloc_get_type_abort( + pull_req->private_data, struct cli_pull_state); + struct cli_request *read_state = cli_request_get(read_req); + NTSTATUS status; + + status = cli_read_andx_recv(read_req, &read_state->data.read.received, + &read_state->data.read.rcvbuf); + if (!NT_STATUS_IS_OK(status)) { + async_req_error(state->req, status); + return; + } + + /* + * This loop is the one to take care of out-of-order replies. All + * pending requests are in state->reqs, state->reqs[top_req] is the + * one that is to be pushed next. If however a request later than + * top_req is replied to, then we can't push yet. If top_req is + * replied to at a later point then, we need to push all the finished + * requests. + */ + + while (state->reqs[state->top_req] != NULL) { + struct cli_request *top_read; + + DEBUG(11, ("cli_pull_read_done: top_req = %d\n", + state->top_req)); + + if (state->reqs[state->top_req]->state < ASYNC_REQ_DONE) { + DEBUG(11, ("cli_pull_read_done: top request not yet " + "done\n")); + return; + } + + top_read = cli_request_get(state->reqs[state->top_req]); + + DEBUG(10, ("cli_pull_read_done: Pushing %d bytes, %d already " + "pushed\n", (int)top_read->data.read.received, + (int)state->pushed)); + + status = state->sink((char *)top_read->data.read.rcvbuf, + top_read->data.read.received, + state->priv); + if (!NT_STATUS_IS_OK(status)) { + async_req_error(state->req, status); + return; + } + state->pushed += top_read->data.read.received; + + TALLOC_FREE(state->reqs[state->top_req]); + + if (state->requested < state->size) { + struct async_req *new_req; + size_t size_left, request_thistime; + + size_left = state->size - state->requested; + request_thistime = MIN(size_left, state->chunk_size); + + DEBUG(10, ("cli_pull_read_done: Requesting %d bytes " + "at %d, position %d\n", + (int)request_thistime, + (int)(state->start_offset + + state->requested), + state->top_req)); + + new_req = cli_read_andx_send( + state->reqs, state->cli, state->fnum, + state->start_offset + state->requested, + request_thistime); + + if (async_req_nomem(new_req, state->req)) { + return; + } + + new_req->async.fn = cli_pull_read_done; + new_req->async.priv = pull_req; + + state->reqs[state->top_req] = new_req; + state->requested += request_thistime; + } + + state->top_req = (state->top_req+1) % state->num_reqs; + } + + async_req_done(pull_req); +} + +NTSTATUS cli_pull_recv(struct async_req *req, ssize_t *received) +{ + struct cli_pull_state *state = talloc_get_type_abort( + req->private_data, struct cli_pull_state); + + SMB_ASSERT(req->state >= ASYNC_REQ_DONE); + if (req->state == ASYNC_REQ_ERROR) { + return req->status; + } + *received = state->pushed; + return NT_STATUS_OK; +} + +NTSTATUS cli_pull(struct cli_state *cli, uint16_t fnum, + off_t start_offset, size_t size, size_t window_size, + NTSTATUS (*sink)(char *buf, size_t n, void *priv), + void *priv, ssize_t *received) +{ + TALLOC_CTX *frame = talloc_stackframe(); + struct async_req *req; + NTSTATUS result = NT_STATUS_NO_MEMORY; + + if (cli_tmp_event_ctx(frame, cli) == NULL) { + goto nomem; + } + + req = cli_pull_send(frame, cli, fnum, start_offset, size, window_size, + sink, priv); + if (req == NULL) { + goto nomem; + } + + while (req->state < ASYNC_REQ_DONE) { + event_loop_once(cli->event_ctx); + } + + result = cli_pull_recv(req, received); + nomem: + TALLOC_FREE(frame); + return result; +} + /**************************************************************************** Issue a single SMBread and don't wait for a reply. ****************************************************************************/