#include "../lib/util/tevent_ntstatus.h"
#include "async_smb.h"
#include "trans2.h"
+#include "../libcli/smb/smbXcli_base.h"
/****************************************************************************
Calculate the recommended read buffer size
if (cli->server_posix_capabilities & CIFS_UNIX_LARGE_READ_CAP) {
useable_space = 0xFFFFFF - data_offset;
- if (client_is_signing_on(cli)) {
+ if (smb1cli_conn_signing_is_active(cli->conn)) {
return min_space;
}
- if (cli_state_encryption_on(cli)) {
+ if (smb1cli_conn_encryption_on(cli->conn)) {
return min_space;
}
return useable_space;
- } else if (cli_state_capabilities(cli) & CAP_LARGE_READX) {
+ } else if (smb1cli_conn_capabilities(cli->conn) & CAP_LARGE_READX) {
/*
* Note: CAP_LARGE_READX also works with signing
*/
if (cli->server_posix_capabilities & CIFS_UNIX_LARGE_WRITE_CAP) {
useable_space = 0xFFFFFF - data_offset;
- } else if (cli_state_capabilities(cli) & CAP_LARGE_WRITEX) {
+ } else if (smb1cli_conn_capabilities(cli->conn) & CAP_LARGE_WRITEX) {
useable_space = 0x1FFFF - data_offset;
} else {
return min_space;
return min_space;
}
- if (client_is_signing_on(cli)) {
+ if (smb1cli_conn_signing_is_active(cli->conn)) {
return min_space;
}
- if (cli_state_encryption_on(cli)) {
+ if (smb1cli_conn_encryption_on(cli->conn)) {
return min_space;
}
static void cli_read_andx_done(struct tevent_req *subreq);
struct tevent_req *cli_read_andx_create(TALLOC_CTX *mem_ctx,
- struct event_context *ev,
+ struct tevent_context *ev,
struct cli_state *cli, uint16_t fnum,
off_t offset, size_t size,
struct tevent_req **psmbreq)
struct cli_read_andx_state *state;
uint8_t wct = 10;
- if (size > cli_read_max_bufsize(cli)) {
- DEBUG(0, ("cli_read_andx_send got size=%d, can only handle "
- "size=%d\n", (int)size,
- (int)cli_read_max_bufsize(cli)));
- return NULL;
- }
-
req = tevent_req_create(mem_ctx, &state, struct cli_read_andx_state);
if (req == NULL) {
return NULL;
SSVAL(state->vwv + 8, 0, 0);
SSVAL(state->vwv + 9, 0, 0);
- if (cli_state_capabilities(cli) & CAP_LARGE_FILES) {
+ if (smb1cli_conn_capabilities(cli->conn) & CAP_LARGE_FILES) {
SIVAL(state->vwv + 10, 0,
(((uint64_t)offset)>>32) & 0xffffffff);
wct = 12;
}
struct tevent_req *cli_read_andx_send(TALLOC_CTX *mem_ctx,
- struct event_context *ev,
+ struct tevent_context *ev,
struct cli_state *cli, uint16_t fnum,
off_t offset, size_t size)
{
return NULL;
}
- status = cli_smb_req_send(subreq);
+ status = smb1cli_req_chain_submit(&subreq, 1);
if (tevent_req_nterror(req, status)) {
return tevent_req_post(req, ev);
}
state->buf = discard_const_p(uint8_t, smb_base(inbuf)) + SVAL(vwv+6, 0);
- if (trans_oob(smb_len(inbuf), SVAL(vwv+6, 0), state->received)
+ if (trans_oob(smb_len_tcp(inbuf), SVAL(vwv+6, 0), state->received)
|| ((state->received != 0) && (state->buf < bytes))) {
DEBUG(5, ("server returned invalid read&x data offset\n"));
tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
static void cli_readall_done(struct tevent_req *subreq);
static struct tevent_req *cli_readall_send(TALLOC_CTX *mem_ctx,
- struct event_context *ev,
+ struct tevent_context *ev,
struct cli_state *cli,
uint16_t fnum,
off_t offset, size_t size)
return NT_STATUS_OK;
}
-struct cli_pull_subreq {
- struct tevent_req *req;
- ssize_t received;
- uint8_t *buf;
-};
-
-/*
- * Parallel read support.
- *
- * cli_pull sends as many read&x requests as the server would allow via
- * max_mux at a time. When replies flow back in, the data is written into
- * the callback function "sink" in the right order.
- */
+struct cli_pull_chunk;
struct cli_pull_state {
- struct tevent_req *req;
-
- struct event_context *ev;
+ struct tevent_context *ev;
struct cli_state *cli;
uint16_t fnum;
off_t start_offset;
- SMB_OFF_T size;
+ off_t size;
NTSTATUS (*sink)(char *buf, size_t n, void *priv);
void *priv;
size_t chunk_size;
+ off_t next_offset;
+ off_t remaining;
/*
- * Outstanding requests
- */
- uint16_t max_reqs;
- int num_reqs;
- struct cli_pull_subreq *reqs;
-
- /*
- * For how many bytes did we send requests already?
- */
- SMB_OFF_T requested;
-
- /*
- * Next request index to push into "sink". This walks around the "req"
- * array, taking care that the requests are pushed to "sink" in the
- * right order. If necessary (i.e. replies don't come in in the right
- * order), replies are held back in "reqs".
+ * How many bytes did we push into "sink"?
*/
- int top_req;
+ off_t pushed;
/*
- * How many bytes did we push into "sink"?
+ * Outstanding requests
+ *
+ * The maximum is 256:
+ * - which would be a window of 256 MByte
+ * for SMB2 with multi-credit
+ * or smb1 unix extentions.
*/
-
- SMB_OFF_T pushed;
+ uint16_t max_chunks;
+ uint16_t num_chunks;
+ uint16_t num_waiting;
+ struct cli_pull_chunk *chunks;
};
-static char *cli_pull_print(struct tevent_req *req, TALLOC_CTX *mem_ctx)
-{
- struct cli_pull_state *state = tevent_req_data(
- req, struct cli_pull_state);
- char *result;
-
- result = tevent_req_default_print(req, mem_ctx);
- if (result == NULL) {
- return NULL;
- }
-
- return talloc_asprintf_append_buffer(
- result, "num_reqs=%d, top_req=%d",
- state->num_reqs, state->top_req);
-}
+struct cli_pull_chunk {
+ struct cli_pull_chunk *prev, *next;
+ struct tevent_req *req;/* This is the main request! Not the subreq */
+ struct tevent_req *subreq;
+ off_t ofs;
+ uint8_t *buf;
+ size_t total_size;
+ size_t tmp_size;
+ bool done;
+};
-static void cli_pull_read_done(struct tevent_req *read_req);
+static void cli_pull_setup_chunks(struct tevent_req *req);
+static void cli_pull_chunk_ship(struct cli_pull_chunk *chunk);
+static void cli_pull_chunk_done(struct tevent_req *subreq);
/*
- * Prepare an async pull request
+ * Parallel read support.
+ *
+ * cli_pull sends as many read&x requests as the server would allow via
+ * max_mux at a time. When replies flow back in, the data is written into
+ * the callback function "sink" in the right order.
*/
struct tevent_req *cli_pull_send(TALLOC_CTX *mem_ctx,
- struct event_context *ev,
+ struct tevent_context *ev,
struct cli_state *cli,
uint16_t fnum, off_t start_offset,
- SMB_OFF_T size, size_t window_size,
+ off_t size, size_t window_size,
NTSTATUS (*sink)(char *buf, size_t n,
void *priv),
void *priv)
{
struct tevent_req *req;
struct cli_pull_state *state;
- int i;
size_t page_size = 1024;
+ uint64_t tmp64;
req = tevent_req_create(mem_ctx, &state, struct cli_pull_state);
if (req == NULL) {
return NULL;
}
- tevent_req_set_print_fn(req, cli_pull_print);
- state->req = req;
-
state->cli = cli;
state->ev = ev;
state->fnum = fnum;
state->size = size;
state->sink = sink;
state->priv = priv;
-
- state->pushed = 0;
- state->top_req = 0;
+ state->next_offset = start_offset;
+ state->remaining = size;
if (size == 0) {
tevent_req_done(req);
state->chunk_size &= ~(page_size - 1);
}
- state->max_reqs = cli_state_max_requests(cli);
+ if (window_size == 0) {
+ /*
+ * We use 16 MByte as default window size.
+ */
+ window_size = 16 * 1024 * 1024;
+ }
+
+ tmp64 = window_size/state->chunk_size;
+ if ((window_size % state->chunk_size) > 0) {
+ tmp64 += 1;
+ }
+ tmp64 = MAX(tmp64, 1);
+ tmp64 = MIN(tmp64, 256);
+ state->max_chunks = tmp64;
+
+ /*
+ * We defer the callback because of the complex
+ * substate/subfunction logic
+ */
+ tevent_req_defer_callback(req, ev);
+
+ cli_pull_setup_chunks(req);
+ if (!tevent_req_is_in_progress(req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ return req;
+}
- state->num_reqs = MAX(window_size/state->chunk_size, 1);
- state->num_reqs = MIN(state->num_reqs, state->max_reqs);
+static void cli_pull_setup_chunks(struct tevent_req *req)
+{
+ struct cli_pull_state *state =
+ tevent_req_data(req,
+ struct cli_pull_state);
+ struct cli_pull_chunk *chunk, *next = NULL;
+ size_t i;
- state->reqs = talloc_zero_array(state, struct cli_pull_subreq,
- state->num_reqs);
- if (state->reqs == NULL) {
- goto failed;
+ for (chunk = state->chunks; chunk; chunk = next) {
+ /*
+ * Note that chunk might be removed from this call.
+ */
+ next = chunk->next;
+ cli_pull_chunk_ship(chunk);
+ if (!tevent_req_is_in_progress(req)) {
+ return;
+ }
}
- state->requested = 0;
+ for (i = state->num_chunks; i < state->max_chunks; i++) {
- for (i=0; i<state->num_reqs; i++) {
- struct cli_pull_subreq *subreq = &state->reqs[i];
- SMB_OFF_T size_left;
- size_t request_thistime;
+ if (state->num_waiting > 0) {
+ return;
+ }
- if (state->requested >= size) {
- state->num_reqs = i;
+ if (state->remaining == 0) {
break;
}
- size_left = size - state->requested;
- request_thistime = MIN(size_left, state->chunk_size);
+ chunk = talloc_zero(state, struct cli_pull_chunk);
+ if (tevent_req_nomem(chunk, req)) {
+ return;
+ }
+ chunk->req = req;
+ chunk->ofs = state->next_offset;
+ chunk->total_size = MIN(state->remaining, state->chunk_size);
+ state->next_offset += chunk->total_size;
+ state->remaining -= chunk->total_size;
+
+ DLIST_ADD_END(state->chunks, chunk, NULL);
+ state->num_chunks++;
+ state->num_waiting++;
+
+ cli_pull_chunk_ship(chunk);
+ if (!tevent_req_is_in_progress(req)) {
+ return;
+ }
+ }
- subreq->req = cli_readall_send(
- state->reqs, ev, cli, fnum,
- state->start_offset + state->requested,
- request_thistime);
+ if (state->remaining > 0) {
+ return;
+ }
- if (subreq->req == NULL) {
- goto failed;
- }
- tevent_req_set_callback(subreq->req, cli_pull_read_done, req);
- state->requested += request_thistime;
+ if (state->num_chunks > 0) {
+ return;
}
- return req;
-failed:
- TALLOC_FREE(req);
- return NULL;
+ tevent_req_done(req);
}
-/*
- * Handle incoming read replies, push the data into sink and send out new
- * requests if necessary.
- */
-
-static void cli_pull_read_done(struct tevent_req *subreq)
+static void cli_pull_chunk_ship(struct cli_pull_chunk *chunk)
{
- struct tevent_req *req = tevent_req_callback_data(
- subreq, struct tevent_req);
- struct cli_pull_state *state = tevent_req_data(
- req, struct cli_pull_state);
- struct cli_pull_subreq *pull_subreq = NULL;
- NTSTATUS status;
- int i;
+ struct tevent_req *req = chunk->req;
+ struct cli_pull_state *state =
+ tevent_req_data(req,
+ struct cli_pull_state);
+ bool ok;
+ off_t ofs;
+ size_t size;
- for (i = 0; i < state->num_reqs; i++) {
- pull_subreq = &state->reqs[i];
- if (subreq == pull_subreq->req) {
- break;
+ if (chunk->done) {
+ NTSTATUS status;
+
+ if (chunk != state->chunks) {
+ /*
+ * this chunk is not the
+ * first one in the list.
+ *
+ * which means we should not
+ * push it into the sink yet.
+ */
+ return;
+ }
+
+ if (chunk->tmp_size == 0) {
+ /*
+ * we git a short read, we're done
+ */
+ tevent_req_done(req);
+ return;
+ }
+
+ status = state->sink((char *)chunk->buf,
+ chunk->tmp_size,
+ state->priv);
+ if (tevent_req_nterror(req, status)) {
+ return;
+ }
+ state->pushed += chunk->tmp_size;
+
+ if (chunk->tmp_size < chunk->total_size) {
+ /*
+ * we git a short read, we're done
+ */
+ tevent_req_done(req);
+ return;
}
+
+ DLIST_REMOVE(state->chunks, chunk);
+ SMB_ASSERT(state->num_chunks > 0);
+ state->num_chunks--;
+ TALLOC_FREE(chunk);
+
+ return;
}
- if (i == state->num_reqs) {
- /* Huh -- received something we did not send?? */
- tevent_req_nterror(req, NT_STATUS_INTERNAL_ERROR);
+
+ if (chunk->subreq != NULL) {
return;
}
- status = cli_readall_recv(subreq, &pull_subreq->received,
- &pull_subreq->buf);
- if (!NT_STATUS_IS_OK(status)) {
- tevent_req_nterror(state->req, status);
+ SMB_ASSERT(state->num_waiting > 0);
+
+ ofs = chunk->ofs + chunk->tmp_size;
+ size = chunk->total_size - chunk->tmp_size;
+
+ ok = smb1cli_conn_req_possible(state->cli->conn);
+ if (!ok) {
return;
}
- /*
- * This loop is the one to take care of out-of-order replies. All
- * pending requests are in state->reqs, state->reqs[top_req] is the
- * one that is to be pushed next. If however a request later than
- * top_req is replied to, then we can't push yet. If top_req is
- * replied to at a later point then, we need to push all the finished
- * requests.
- */
+ chunk->subreq = cli_read_andx_send(chunk,
+ state->ev,
+ state->cli,
+ state->fnum,
+ ofs,
+ size);
+ if (tevent_req_nomem(chunk->subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(chunk->subreq,
+ cli_pull_chunk_done,
+ chunk);
- while (state->reqs[state->top_req].req != NULL) {
- struct cli_pull_subreq *top_subreq;
+ state->num_waiting--;
+ return;
+}
- DEBUG(11, ("cli_pull_read_done: top_req = %d\n",
- state->top_req));
+static void cli_pull_chunk_done(struct tevent_req *subreq)
+{
+ struct cli_pull_chunk *chunk =
+ tevent_req_callback_data(subreq,
+ struct cli_pull_chunk);
+ struct tevent_req *req = chunk->req;
+ struct cli_pull_state *state =
+ tevent_req_data(req,
+ struct cli_pull_state);
+ NTSTATUS status;
+ size_t expected = chunk->total_size - chunk->tmp_size;
+ ssize_t received;
+ uint8_t *buf = NULL;
- top_subreq = &state->reqs[state->top_req];
+ chunk->subreq = NULL;
- if (tevent_req_is_in_progress(top_subreq->req)) {
- DEBUG(11, ("cli_pull_read_done: top request not yet "
- "done\n"));
- return;
- }
+ status = cli_read_andx_recv(subreq, &received, &buf);
+ if (NT_STATUS_EQUAL(status, NT_STATUS_END_OF_FILE)) {
+ received = 0;
+ status = NT_STATUS_OK;
+ }
+ if (tevent_req_nterror(req, status)) {
+ return;
+ }
- DEBUG(10, ("cli_pull_read_done: Pushing %d bytes, %d already "
- "pushed\n", (int)top_subreq->received,
- (int)state->pushed));
+ if (received > expected) {
+ tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
+ return;
+ }
- status = state->sink((char *)top_subreq->buf,
- top_subreq->received, state->priv);
- if (tevent_req_nterror(state->req, status)) {
+ if (received == 0) {
+ /*
+ * We got EOF we're done
+ */
+ chunk->done = true;
+ cli_pull_setup_chunks(req);
+ return;
+ }
+
+ if (received == chunk->total_size) {
+ /*
+ * We got it in the first run.
+ *
+ * We don't call TALLOC_FREE(subreq)
+ * here and keep the returned buffer.
+ */
+ chunk->buf = buf;
+ } else if (chunk->buf == NULL) {
+ chunk->buf = talloc_array(chunk, uint8_t, chunk->total_size);
+ if (tevent_req_nomem(chunk->buf, req)) {
return;
}
- state->pushed += top_subreq->received;
-
- TALLOC_FREE(state->reqs[state->top_req].req);
-
- if (state->requested < state->size) {
- struct tevent_req *new_req;
- SMB_OFF_T size_left;
- size_t request_thistime;
-
- size_left = state->size - state->requested;
- request_thistime = MIN(size_left, state->chunk_size);
-
- DEBUG(10, ("cli_pull_read_done: Requesting %d bytes "
- "at %d, position %d\n",
- (int)request_thistime,
- (int)(state->start_offset
- + state->requested),
- state->top_req));
-
- new_req = cli_readall_send(
- state->reqs, state->ev, state->cli,
- state->fnum,
- state->start_offset + state->requested,
- request_thistime);
-
- if (tevent_req_nomem(new_req, state->req)) {
- return;
- }
- tevent_req_set_callback(new_req, cli_pull_read_done,
- req);
-
- state->reqs[state->top_req].req = new_req;
- state->requested += request_thistime;
- }
+ }
- state->top_req = (state->top_req+1) % state->num_reqs;
+ if (received != chunk->total_size) {
+ uint8_t *p = chunk->buf + chunk->tmp_size;
+ memcpy(p, buf, received);
+ TALLOC_FREE(subreq);
}
- tevent_req_done(req);
+ chunk->tmp_size += received;
+
+ if (chunk->tmp_size == chunk->total_size) {
+ chunk->done = true;
+ } else {
+ state->num_waiting++;
+ }
+
+ cli_pull_setup_chunks(req);
}
-NTSTATUS cli_pull_recv(struct tevent_req *req, SMB_OFF_T *received)
+NTSTATUS cli_pull_recv(struct tevent_req *req, off_t *received)
{
struct cli_pull_state *state = tevent_req_data(
req, struct cli_pull_state);
NTSTATUS status;
if (tevent_req_is_nterror(req, &status)) {
+ tevent_req_received(req);
return status;
}
*received = state->pushed;
+ tevent_req_received(req);
return NT_STATUS_OK;
}
NTSTATUS cli_pull(struct cli_state *cli, uint16_t fnum,
- off_t start_offset, SMB_OFF_T size, size_t window_size,
+ off_t start_offset, off_t size, size_t window_size,
NTSTATUS (*sink)(char *buf, size_t n, void *priv),
- void *priv, SMB_OFF_T *received)
+ void *priv, off_t *received)
{
TALLOC_CTX *frame = talloc_stackframe();
- struct event_context *ev;
+ struct tevent_context *ev;
struct tevent_req *req;
NTSTATUS status = NT_STATUS_OK;
- if (cli_has_async_calls(cli)) {
+ if (smbXcli_conn_has_async_calls(cli->conn)) {
/*
* Can't use sync call while an async call is in flight
*/
goto fail;
}
- ev = event_context_init(frame);
+ ev = samba_tevent_context_init(frame);
if (ev == NULL) {
status = NT_STATUS_NO_MEMORY;
goto fail;
size_t *nread)
{
NTSTATUS status;
- SMB_OFF_T ret;
+ off_t ret;
status = cli_pull(cli, fnum, offset, size, size,
cli_read_sink, &buf, &ret);
static void cli_write_andx_done(struct tevent_req *subreq);
struct tevent_req *cli_write_andx_create(TALLOC_CTX *mem_ctx,
- struct event_context *ev,
+ struct tevent_context *ev,
struct cli_state *cli, uint16_t fnum,
uint16_t mode, const uint8_t *buf,
off_t offset, size_t size,
{
struct tevent_req *req, *subreq;
struct cli_write_andx_state *state;
- bool bigoffset = ((cli_state_capabilities(cli) & CAP_LARGE_FILES) != 0);
+ bool bigoffset = ((smb1cli_conn_capabilities(cli->conn) & CAP_LARGE_FILES) != 0);
uint8_t wct = bigoffset ? 14 : 12;
size_t max_write = cli_write_max_bufsize(cli, mode, wct);
uint16_t *vwv;
SSVAL(vwv+10, 0, state->size);
SSVAL(vwv+11, 0,
- cli_smb_wct_ofs(reqs_before, num_reqs_before)
+ smb1cli_req_wct_ofs(reqs_before, num_reqs_before)
+ 1 /* the wct field */
+ wct * 2 /* vwv */
+ 2 /* num_bytes field */
state->iov[0].iov_base = (void *)&state->pad;
state->iov[0].iov_len = 1;
state->iov[1].iov_base = discard_const_p(void, buf);
- state->iov[1].iov_len = size;
+ state->iov[1].iov_len = state->size;
subreq = cli_smb_req_create(state, ev, cli, SMBwriteX, 0, wct, vwv,
2, state->iov);
}
struct tevent_req *cli_write_andx_send(TALLOC_CTX *mem_ctx,
- struct event_context *ev,
+ struct tevent_context *ev,
struct cli_state *cli, uint16_t fnum,
uint16_t mode, const uint8_t *buf,
off_t offset, size_t size)
return NULL;
}
- status = cli_smb_req_send(subreq);
+ status = smb1cli_req_chain_submit(&subreq, 1);
if (tevent_req_nterror(req, status)) {
return tevent_req_post(req, ev);
}
req, struct cli_write_andx_state);
uint8_t wct;
uint16_t *vwv;
- uint8_t *inbuf;
NTSTATUS status;
- status = cli_smb_recv(subreq, state, &inbuf, 6, &wct, &vwv,
+ status = cli_smb_recv(subreq, state, NULL, 6, &wct, &vwv,
NULL, NULL);
TALLOC_FREE(subreq);
if (NT_STATUS_IS_ERR(status)) {
}
struct cli_writeall_state {
- struct event_context *ev;
+ struct tevent_context *ev;
struct cli_state *cli;
uint16_t fnum;
uint16_t mode;
static void cli_writeall_written(struct tevent_req *req);
static struct tevent_req *cli_writeall_send(TALLOC_CTX *mem_ctx,
- struct event_context *ev,
+ struct tevent_context *ev,
struct cli_state *cli,
uint16_t fnum,
uint16_t mode,
size_t *pwritten)
{
TALLOC_CTX *frame = talloc_stackframe();
- struct event_context *ev;
+ struct tevent_context *ev;
struct tevent_req *req;
NTSTATUS status = NT_STATUS_NO_MEMORY;
- if (cli_has_async_calls(cli)) {
+ if (smbXcli_conn_has_async_calls(cli->conn)) {
/*
* Can't use sync call while an async call is in flight
*/
status = NT_STATUS_INVALID_PARAMETER;
goto fail;
}
- ev = event_context_init(frame);
+ ev = samba_tevent_context_init(frame);
if (ev == NULL) {
goto fail;
}
return status;
}
-struct cli_push_write_state {
- struct tevent_req *req;/* This is the main request! Not the subreq */
- uint32_t idx;
- off_t ofs;
- uint8_t *buf;
- size_t size;
-};
+struct cli_push_chunk;
struct cli_push_state {
- struct event_context *ev;
+ struct tevent_context *ev;
struct cli_state *cli;
uint16_t fnum;
uint16_t mode;
off_t start_offset;
- size_t window_size;
size_t (*source)(uint8_t *buf, size_t n, void *priv);
void *priv;
/*
* Outstanding requests
+ *
+ * The maximum is 256:
+ * - which would be a window of 256 MByte
+ * for SMB2 with multi-credit
+ * or smb1 unix extentions.
*/
- uint32_t pending;
- uint16_t max_reqs;
- uint32_t num_reqs;
- struct cli_push_write_state **reqs;
+ uint16_t max_chunks;
+ uint16_t num_chunks;
+ uint16_t num_waiting;
+ struct cli_push_chunk *chunks;
};
-static void cli_push_written(struct tevent_req *req);
-
-static bool cli_push_write_setup(struct tevent_req *req,
- struct cli_push_state *state,
- uint32_t idx)
-{
- struct cli_push_write_state *substate;
+struct cli_push_chunk {
+ struct cli_push_chunk *prev, *next;
+ struct tevent_req *req;/* This is the main request! Not the subreq */
struct tevent_req *subreq;
+ off_t ofs;
+ uint8_t *buf;
+ size_t total_size;
+ size_t tmp_size;
+ bool done;
+};
- substate = talloc(state->reqs, struct cli_push_write_state);
- if (!substate) {
- return false;
- }
- substate->req = req;
- substate->idx = idx;
- substate->ofs = state->next_offset;
- substate->buf = talloc_array(substate, uint8_t, state->chunk_size);
- if (!substate->buf) {
- talloc_free(substate);
- return false;
- }
- substate->size = state->source(substate->buf,
- state->chunk_size,
- state->priv);
- if (substate->size == 0) {
- state->eof = true;
- /* nothing to send */
- talloc_free(substate);
- return true;
- }
-
- subreq = cli_writeall_send(substate,
- state->ev, state->cli,
- state->fnum, state->mode,
- substate->buf,
- substate->ofs,
- substate->size);
- if (!subreq) {
- talloc_free(substate);
- return false;
- }
- tevent_req_set_callback(subreq, cli_push_written, substate);
-
- state->reqs[idx] = substate;
- state->pending += 1;
- state->next_offset += substate->size;
-
- return true;
-}
+static void cli_push_setup_chunks(struct tevent_req *req);
+static void cli_push_chunk_ship(struct cli_push_chunk *chunk);
+static void cli_push_chunk_done(struct tevent_req *subreq);
-struct tevent_req *cli_push_send(TALLOC_CTX *mem_ctx, struct event_context *ev,
+struct tevent_req *cli_push_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
struct cli_state *cli,
uint16_t fnum, uint16_t mode,
off_t start_offset, size_t window_size,
{
struct tevent_req *req;
struct cli_push_state *state;
- uint32_t i;
size_t page_size = 1024;
+ uint64_t tmp64;
req = tevent_req_create(mem_ctx, &state, struct cli_push_state);
if (req == NULL) {
state->mode = mode;
state->source = source;
state->priv = priv;
- state->eof = false;
- state->pending = 0;
state->next_offset = start_offset;
state->chunk_size = cli_write_max_bufsize(cli, mode, 14);
state->chunk_size &= ~(page_size - 1);
}
- state->max_reqs = cli_state_max_requests(cli);
-
if (window_size == 0) {
- window_size = state->max_reqs * state->chunk_size;
+ /*
+ * We use 16 MByte as default window size.
+ */
+ window_size = 16 * 1024 * 1024;
}
- state->num_reqs = window_size/state->chunk_size;
+
+ tmp64 = window_size/state->chunk_size;
if ((window_size % state->chunk_size) > 0) {
- state->num_reqs += 1;
+ tmp64 += 1;
}
- state->num_reqs = MIN(state->num_reqs, state->max_reqs);
- state->num_reqs = MAX(state->num_reqs, 1);
+ tmp64 = MAX(tmp64, 1);
+ tmp64 = MIN(tmp64, 256);
+ state->max_chunks = tmp64;
+
+ /*
+ * We defer the callback because of the complex
+ * substate/subfunction logic
+ */
+ tevent_req_defer_callback(req, ev);
- state->reqs = talloc_zero_array(state, struct cli_push_write_state *,
- state->num_reqs);
- if (state->reqs == NULL) {
- goto failed;
+ cli_push_setup_chunks(req);
+ if (!tevent_req_is_in_progress(req)) {
+ return tevent_req_post(req, ev);
}
- for (i=0; i<state->num_reqs; i++) {
- if (!cli_push_write_setup(req, state, i)) {
- goto failed;
+ return req;
+}
+
+static void cli_push_setup_chunks(struct tevent_req *req)
+{
+ struct cli_push_state *state =
+ tevent_req_data(req,
+ struct cli_push_state);
+ struct cli_push_chunk *chunk, *next = NULL;
+ size_t i;
+
+ for (chunk = state->chunks; chunk; chunk = next) {
+ /*
+ * Note that chunk might be removed from this call.
+ */
+ next = chunk->next;
+ cli_push_chunk_ship(chunk);
+ if (!tevent_req_is_in_progress(req)) {
+ return;
+ }
+ }
+
+ for (i = state->num_chunks; i < state->max_chunks; i++) {
+
+ if (state->num_waiting > 0) {
+ return;
}
if (state->eof) {
break;
}
+
+ chunk = talloc_zero(state, struct cli_push_chunk);
+ if (tevent_req_nomem(chunk, req)) {
+ return;
+ }
+ chunk->req = req;
+ chunk->ofs = state->next_offset;
+ chunk->buf = talloc_array(chunk,
+ uint8_t,
+ state->chunk_size);
+ if (tevent_req_nomem(chunk->buf, req)) {
+ return;
+ }
+ chunk->total_size = state->source(chunk->buf,
+ state->chunk_size,
+ state->priv);
+ if (chunk->total_size == 0) {
+ /* nothing to send */
+ talloc_free(chunk);
+ state->eof = true;
+ break;
+ }
+ state->next_offset += chunk->total_size;
+
+ DLIST_ADD_END(state->chunks, chunk, NULL);
+ state->num_chunks++;
+ state->num_waiting++;
+
+ cli_push_chunk_ship(chunk);
+ if (!tevent_req_is_in_progress(req)) {
+ return;
+ }
}
- if (state->pending == 0) {
- tevent_req_done(req);
- return tevent_req_post(req, ev);
+ if (!state->eof) {
+ return;
}
- return req;
+ if (state->num_chunks > 0) {
+ return;
+ }
+
+ tevent_req_done(req);
+}
+
+static void cli_push_chunk_ship(struct cli_push_chunk *chunk)
+{
+ struct tevent_req *req = chunk->req;
+ struct cli_push_state *state =
+ tevent_req_data(req,
+ struct cli_push_state);
+ bool ok;
+ const uint8_t *buf;
+ off_t ofs;
+ size_t size;
- failed:
- tevent_req_nterror(req, NT_STATUS_NO_MEMORY);
- return tevent_req_post(req, ev);
+ if (chunk->done) {
+ DLIST_REMOVE(state->chunks, chunk);
+ SMB_ASSERT(state->num_chunks > 0);
+ state->num_chunks--;
+ TALLOC_FREE(chunk);
+
+ return;
+ }
+
+ if (chunk->subreq != NULL) {
+ return;
+ }
+
+ SMB_ASSERT(state->num_waiting > 0);
+
+ buf = chunk->buf + chunk->tmp_size;
+ ofs = chunk->ofs + chunk->tmp_size;
+ size = chunk->total_size - chunk->tmp_size;
+
+ ok = smb1cli_conn_req_possible(state->cli->conn);
+ if (!ok) {
+ return;
+ }
+
+ chunk->subreq = cli_write_andx_send(chunk,
+ state->ev,
+ state->cli,
+ state->fnum,
+ state->mode,
+ buf,
+ ofs,
+ size);
+ if (tevent_req_nomem(chunk->subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(chunk->subreq,
+ cli_push_chunk_done,
+ chunk);
+
+ state->num_waiting--;
+ return;
}
-static void cli_push_written(struct tevent_req *subreq)
+static void cli_push_chunk_done(struct tevent_req *subreq)
{
- struct cli_push_write_state *substate = tevent_req_callback_data(
- subreq, struct cli_push_write_state);
- struct tevent_req *req = substate->req;
- struct cli_push_state *state = tevent_req_data(
- req, struct cli_push_state);
+ struct cli_push_chunk *chunk =
+ tevent_req_callback_data(subreq,
+ struct cli_push_chunk);
+ struct tevent_req *req = chunk->req;
+ struct cli_push_state *state =
+ tevent_req_data(req,
+ struct cli_push_state);
NTSTATUS status;
- uint32_t idx = substate->idx;
+ size_t expected = chunk->total_size - chunk->tmp_size;
+ size_t written;
- state->reqs[idx] = NULL;
- state->pending -= 1;
+ chunk->subreq = NULL;
- status = cli_writeall_recv(subreq, NULL);
+ status = cli_write_andx_recv(subreq, &written);
TALLOC_FREE(subreq);
- TALLOC_FREE(substate);
if (tevent_req_nterror(req, status)) {
return;
}
- if (!state->eof) {
- if (!cli_push_write_setup(req, state, idx)) {
- tevent_req_nterror(req, NT_STATUS_NO_MEMORY);
- return;
- }
+ if (written > expected) {
+ tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
+ return;
}
- if (state->pending == 0) {
- tevent_req_done(req);
+ if (written == 0) {
+ tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
return;
}
+
+ chunk->tmp_size += written;
+
+ if (chunk->tmp_size == chunk->total_size) {
+ chunk->done = true;
+ } else {
+ state->num_waiting++;
+ }
+
+ cli_push_setup_chunks(req);
}
NTSTATUS cli_push_recv(struct tevent_req *req)
void *priv)
{
TALLOC_CTX *frame = talloc_stackframe();
- struct event_context *ev;
+ struct tevent_context *ev;
struct tevent_req *req;
NTSTATUS status = NT_STATUS_OK;
- if (cli_has_async_calls(cli)) {
+ if (smbXcli_conn_has_async_calls(cli->conn)) {
/*
* Can't use sync call while an async call is in flight
*/
goto fail;
}
- ev = event_context_init(frame);
+ ev = samba_tevent_context_init(frame);
if (ev == NULL) {
status = NT_STATUS_NO_MEMORY;
goto fail;