s3:smbd: add infrastructure to wait for TCP acks
authorStefan Metzmacher <metze@samba.org>
Wed, 3 Jun 2020 08:57:59 +0000 (10:57 +0200)
committerStefan Metzmacher <metze@samba.org>
Wed, 8 Jul 2020 15:54:40 +0000 (15:54 +0000)
This will be the core of the logic that allows
us to retry break notifications.

When we start the "pending break cycle" we ask for
the current retransmission timemout (rto) on the TCP connection
and remember how many unacked bytes are in the kernel's
send queue. Each time we send bytes into the kernel
we add them to the unacked bytes.
We use a timer using the rto interval in order
to check the amount of unacked bytes again.
The provides send_queu_entry.ack.req will be completed
with tevent_req_done() when everything is completely acked,
tevent_req_nterror(NT_STATUS_IO_TIMEOUT) when
send_queu_entry.ack.timeout is expired or
tevent_req_nterror(connection_error) when the connection
gets disconnected.

It works with support from the FreeBSD and Linux kernels.
For other platforms we just have a fixed rto of 1 second.
And pretend all bytes are acked when we recheck after 1 second.
So only a connection error could trigger tevent_req_nterror(),
but there's no timeout. A follow up commit will most likely
disable support for multi-channel if we don't have kernel support.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=11897

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Günther Deschner <gd@samba.org>
source3/librpc/idl/smbXsrv.idl
source3/smbd/globals.h
source3/smbd/smb2_server.c

index 2e8e2bbcd31bcd2c1637dfc359e16dddd24c3b17..1ecc40fcaac253f06336d6bcd578073e00fad2e6 100644 (file)
@@ -142,6 +142,11 @@ interface smbXsrv
                [ignore] struct smbXsrv_connection      *connections;
                boolean8                server_multi_channel_enabled;
                hyper                   next_channel_id;
+
+               /*
+                * A List of pending breaks.
+                */
+               [ignore] struct smbXsrv_pending_break *pending_breaks;
        } smbXsrv_client;
 
        typedef union {
index ec728b7dcf3958550f0324e41d812a9c6cabbbf5..1276ffa0084dd345f057c851d2c91eddcf1bfb23 100644 (file)
@@ -372,6 +372,13 @@ struct smbXsrv_connection {
                } nbt;
        } transport;
 
+       struct {
+               uint64_t unacked_bytes;
+               uint32_t rto_usecs;
+               struct tevent_req *checker_subreq;
+               struct smbd_smb2_send_queue *queue;
+       } ack;
+
        struct {
                struct {
                        /*
@@ -687,6 +694,12 @@ struct smbd_smb2_send_queue {
        struct iovec *vector;
        int count;
 
+       struct {
+               struct tevent_req *req;
+               struct timeval timeout;
+               uint64_t required_acked_bytes;
+       } ack;
+
        TALLOC_CTX *mem_ctx;
 };
 
index e8820afa796f44d3c08c605c0bf77f81735777dc..31a5a5f868c21c988d8ffd9837910721ad5b93c5 100644 (file)
@@ -20,6 +20,7 @@
 */
 
 #include "includes.h"
+#include "system/network.h"
 #include "smbd/smbd.h"
 #include "smbd/globals.h"
 #include "../libcli/smb/smb_common.h"
 #include "auth.h"
 #include "libcli/smb/smbXcli_base.h"
 
+#if defined(LINUX)
+/* SIOCOUTQ TIOCOUTQ are the same */
+#define __IOCTL_SEND_QUEUE_SIZE_OPCODE TIOCOUTQ
+#define __HAVE_TCP_INFO_RTO 1
+#elif defined(FREEBSD)
+#define __IOCTL_SEND_QUEUE_SIZE_OPCODE FIONWRITE
+#define __HAVE_TCP_INFO_RTO 1
+#endif
+
 #include "lib/crypto/gnutls_helpers.h"
 #include <gnutls/gnutls.h>
 #include <gnutls/crypto.h>
@@ -1106,6 +1116,309 @@ static NTSTATUS smbd_smb2_request_setup_out(struct smbd_smb2_request *req)
        return NT_STATUS_OK;
 }
 
+static NTSTATUS smbXsrv_connection_get_rto_usecs(struct smbXsrv_connection *xconn,
+                                                uint32_t *_rto_usecs)
+{
+       /*
+        * Define an Retransmission Timeout
+        * of 1 second, if there's no way for the
+        * kernel to tell us the current value.
+        */
+       uint32_t rto_usecs = 1000000;
+
+#ifdef __HAVE_TCP_INFO_RTO
+       {
+               struct tcp_info info;
+               socklen_t ilen = sizeof(info);
+               int ret;
+
+               ZERO_STRUCT(info);
+               ret = getsockopt(xconn->transport.sock,
+                                IPPROTO_TCP, TCP_INFO,
+                                (void *)&info, &ilen);
+               if (ret != 0) {
+                       int saved_errno = errno;
+                       NTSTATUS status = map_nt_error_from_unix(errno);
+                       DBG_ERR("getsockopt(TCP_INFO) errno[%d/%s] -s %s\n",
+                               saved_errno, strerror(saved_errno),
+                               nt_errstr(status));
+                       return status;
+               }
+
+               DBG_DEBUG("tcpi_rto[%u] tcpi_rtt[%u] tcpi_rttvar[%u]\n",
+                         (unsigned)info.tcpi_rto,
+                         (unsigned)info.tcpi_rtt,
+                         (unsigned)info.tcpi_rttvar);
+               rto_usecs = info.tcpi_rto;
+       }
+#endif /* __HAVE_TCP_INFO_RTO */
+
+       rto_usecs = MAX(rto_usecs,  200000); /* at least 0.2s */
+       rto_usecs = MIN(rto_usecs, 1000000); /* at max   1.0s */
+       *_rto_usecs = rto_usecs;
+       return NT_STATUS_OK;
+}
+
+static NTSTATUS smbXsrv_connection_get_acked_bytes(struct smbXsrv_connection *xconn,
+                                                  uint64_t *_acked_bytes)
+{
+       /*
+        * Unless the kernel has an interface
+        * to reveal the number of un-acked bytes
+        * in the socket send queue, we'll assume
+        * everything is already acked.
+        *
+        * But that would mean that we better don't
+        * pretent to support multi-channel.
+        */
+       uint64_t unacked_bytes = 0;
+
+       *_acked_bytes = 0;
+
+#ifdef __IOCTL_SEND_QUEUE_SIZE_OPCODE
+       {
+               int value = 0;
+               int ret;
+
+               /*
+                * If we have kernel support to get
+                * the number of bytes waiting in
+                * the socket's send queue, we
+                * use that in order to find out
+                * the number of unacked bytes.
+                */
+               ret = ioctl(xconn->transport.sock,
+                           __IOCTL_SEND_QUEUE_SIZE_OPCODE,
+                           &value);
+               if (ret != 0) {
+                       int saved_errno = errno;
+                       NTSTATUS status = map_nt_error_from_unix(saved_errno);
+                       DBG_ERR("Failed to get the SEND_QUEUE_SIZE - "
+                               "errno %d (%s) - %s\n",
+                               saved_errno, strerror(saved_errno),
+                               nt_errstr(status));
+                       return status;
+               }
+
+               if (value < 0) {
+                       DBG_ERR("xconn->ack.unacked_bytes[%llu] value[%d]\n",
+                               (unsigned long long)xconn->ack.unacked_bytes,
+                               value);
+                       return NT_STATUS_INTERNAL_ERROR;
+               }
+               unacked_bytes = value;
+       }
+#endif
+       if (xconn->ack.unacked_bytes == 0) {
+               xconn->ack.unacked_bytes = unacked_bytes;
+               return NT_STATUS_OK;
+       }
+
+       if (xconn->ack.unacked_bytes < unacked_bytes) {
+               DBG_ERR("xconn->ack.unacked_bytes[%llu] unacked_bytes[%llu]\n",
+                       (unsigned long long)xconn->ack.unacked_bytes,
+                       (unsigned long long)unacked_bytes);
+               return NT_STATUS_INTERNAL_ERROR;
+       }
+
+       *_acked_bytes = xconn->ack.unacked_bytes - unacked_bytes;
+       xconn->ack.unacked_bytes = unacked_bytes;
+       return NT_STATUS_OK;
+}
+
+static void smbd_smb2_send_queue_ack_fail(struct smbd_smb2_send_queue **queue,
+                                         NTSTATUS status)
+{
+       struct smbd_smb2_send_queue *e = NULL;
+       struct smbd_smb2_send_queue *n = NULL;
+
+       for (e = *queue; e != NULL; e = n) {
+               n = e->next;
+
+               DLIST_REMOVE(*queue, e);
+               if (e->ack.req != NULL) {
+                       tevent_req_nterror(e->ack.req, status);
+               }
+       }
+}
+
+static NTSTATUS smbd_smb2_send_queue_ack_bytes(struct smbd_smb2_send_queue **queue,
+                                              uint64_t acked_bytes)
+{
+       struct smbd_smb2_send_queue *e = NULL;
+       struct smbd_smb2_send_queue *n = NULL;
+
+       for (e = *queue; e != NULL; e = n) {
+               bool expired;
+
+               n = e->next;
+
+               if (e->ack.req == NULL) {
+                       continue;
+               }
+
+               if (e->ack.required_acked_bytes <= acked_bytes) {
+                       e->ack.required_acked_bytes = 0;
+                       DLIST_REMOVE(*queue, e);
+                       tevent_req_done(e->ack.req);
+                       continue;
+               }
+               e->ack.required_acked_bytes -= acked_bytes;
+
+               expired = timeval_expired(&e->ack.timeout);
+               if (expired) {
+                       return NT_STATUS_IO_TIMEOUT;
+               }
+       }
+
+       return NT_STATUS_OK;
+}
+
+static NTSTATUS smbd_smb2_check_ack_queue(struct smbXsrv_connection *xconn)
+{
+       uint64_t acked_bytes = 0;
+       NTSTATUS status;
+
+       status = smbXsrv_connection_get_acked_bytes(xconn, &acked_bytes);
+       if (!NT_STATUS_IS_OK(status)) {
+               return status;
+       }
+
+       status = smbd_smb2_send_queue_ack_bytes(&xconn->ack.queue, acked_bytes);
+       if (!NT_STATUS_IS_OK(status)) {
+               return status;
+       }
+
+       status = smbd_smb2_send_queue_ack_bytes(&xconn->smb2.send_queue, 0);
+       if (!NT_STATUS_IS_OK(status)) {
+               return status;
+       }
+
+       return NT_STATUS_OK;
+}
+
+static void smbXsrv_connection_ack_checker(struct tevent_req *subreq)
+{
+       struct smbXsrv_connection *xconn =
+               tevent_req_callback_data(subreq,
+               struct smbXsrv_connection);
+       struct smbXsrv_client *client = xconn->client;
+       struct timeval next_check;
+       NTSTATUS status;
+       bool ok;
+
+       xconn->ack.checker_subreq = NULL;
+
+       ok = tevent_wakeup_recv(subreq);
+       TALLOC_FREE(subreq);
+       if (!ok) {
+               smbd_server_connection_terminate(xconn,
+                                                "tevent_wakeup_recv() failed");
+               return;
+       }
+
+       status = smbd_smb2_check_ack_queue(xconn);
+       if (!NT_STATUS_IS_OK(status)) {
+               smbd_server_connection_terminate(xconn, nt_errstr(status));
+               return;
+       }
+
+       next_check = timeval_current_ofs_usec(xconn->ack.rto_usecs);
+       xconn->ack.checker_subreq = tevent_wakeup_send(xconn,
+                                                      client->raw_ev_ctx,
+                                                      next_check);
+       if (xconn->ack.checker_subreq == NULL) {
+               smbd_server_connection_terminate(xconn,
+                                                "tevent_wakeup_send() failed");
+               return;
+       }
+       tevent_req_set_callback(xconn->ack.checker_subreq,
+                               smbXsrv_connection_ack_checker,
+                               xconn);
+}
+
+static NTSTATUS smbXsrv_client_pending_breaks_updated(struct smbXsrv_client *client)
+{
+       struct smbXsrv_connection *xconn = NULL;
+
+       for (xconn = client->connections; xconn != NULL; xconn = xconn->next) {
+               struct timeval next_check;
+               uint64_t acked_bytes = 0;
+               NTSTATUS status;
+
+               /*
+                * A new 'pending break cycle' starts
+                * with a first pending break and lasts until
+                * all pending breaks are finished.
+                *
+                * This is typically a very short time,
+                * the value of one retransmission timeout.
+                */
+
+               if (client->pending_breaks == NULL) {
+                       /*
+                        * No more pending breaks, remove a pending
+                        * checker timer
+                        */
+                       TALLOC_FREE(xconn->ack.checker_subreq);
+                       continue;
+               }
+
+               if (xconn->ack.checker_subreq != NULL) {
+                       /*
+                        * The cycle already started =>
+                        * nothing todo
+                        */
+                       continue;
+               }
+
+               /*
+                * Get the current retransmission timeout value.
+                *
+                * It may change over time, but fetching it once
+                * per 'pending break' cycled should be enough.
+                */
+               status = smbXsrv_connection_get_rto_usecs(xconn,
+                                                         &xconn->ack.rto_usecs);
+               if (!NT_STATUS_IS_OK(status)) {
+                       return status;
+               }
+
+               /*
+                * At the start of the cycle we reset the
+                * unacked_bytes counter (first to 0 and
+                * within smbXsrv_connection_get_acked_bytes()
+                * to the current value in the kernel
+                * send queue.
+                */
+               xconn->ack.unacked_bytes = 0;
+               status = smbXsrv_connection_get_acked_bytes(xconn, &acked_bytes);
+               if (!NT_STATUS_IS_OK(status)) {
+                       return status;
+               }
+
+               /*
+                * We setup a timer in order to check for
+                * acked bytes after one retransmission timeout.
+                *
+                * The code that sets up the send_queue.ack.timeout
+                * uses a multiple of the retransmission timeout.
+                */
+               next_check = timeval_current_ofs_usec(xconn->ack.rto_usecs);
+               xconn->ack.checker_subreq = tevent_wakeup_send(xconn,
+                                                       client->raw_ev_ctx,
+                                                       next_check);
+               if (xconn->ack.checker_subreq == NULL) {
+                       return NT_STATUS_NO_MEMORY;
+               }
+               tevent_req_set_callback(xconn->ack.checker_subreq,
+                                       smbXsrv_connection_ack_checker,
+                                       xconn);
+       }
+
+       return NT_STATUS_OK;
+}
+
 void smbXsrv_connection_disconnect_transport(struct smbXsrv_connection *xconn,
                                             NTSTATUS status)
 {
@@ -1118,6 +1431,9 @@ void smbXsrv_connection_disconnect_transport(struct smbXsrv_connection *xconn,
        if (xconn->transport.sock != -1) {
                xconn->transport.sock = -1;
        }
+       smbd_smb2_send_queue_ack_fail(&xconn->ack.queue, status);
+       smbd_smb2_send_queue_ack_fail(&xconn->smb2.send_queue, status);
+       xconn->smb2.send_queue_len = 0;
        DO_PROFILE_INC(disconnect);
 }
 
@@ -4017,6 +4333,23 @@ NTSTATUS smbd_smb2_process_negprot(struct smbXsrv_connection *xconn,
                return status;
        }
 
+       /*
+        * If a new connection joins the process, when we're
+        * already in a "pending break cycle", we need to
+        * turn on the ack checker on the new connection.
+        */
+       status = smbXsrv_client_pending_breaks_updated(xconn->client);
+       if (!NT_STATUS_IS_OK(status)) {
+               /*
+                * If there's a problem, we disconnect the whole
+                * client with all connections here!
+                *
+                * Instead of just the new connection.
+                */
+               smbd_server_disconnect_client(xconn->client, nt_errstr(status));
+               return status;
+       }
+
        status = smbd_smb2_request_create(xconn, inpdu, size, &req);
        if (!NT_STATUS_IS_OK(status)) {
                smbd_server_connection_terminate(xconn, nt_errstr(status));
@@ -4163,6 +4496,9 @@ static NTSTATUS smbd_smb2_flush_send_queue(struct smbXsrv_connection *xconn)
 
                        xconn->smb2.send_queue_len--;
                        DLIST_REMOVE(xconn->smb2.send_queue, e);
+
+                       size += e->sendfile_body_size;
+
                        /*
                         * This triggers the sendfile path via
                         * the destructor.
@@ -4174,6 +4510,7 @@ static NTSTATUS smbd_smb2_flush_send_queue(struct smbXsrv_connection *xconn)
                                                                        status);
                                return status;
                        }
+                       xconn->ack.unacked_bytes += size;
                        continue;
                }
 
@@ -4200,6 +4537,8 @@ static NTSTATUS smbd_smb2_flush_send_queue(struct smbXsrv_connection *xconn)
                        return status;
                }
 
+               xconn->ack.unacked_bytes += ret;
+
                ok = iov_advance(&e->vector, &e->count, ret);
                if (!ok) {
                        return NT_STATUS_INTERNAL_ERROR;
@@ -4213,7 +4552,14 @@ static NTSTATUS smbd_smb2_flush_send_queue(struct smbXsrv_connection *xconn)
 
                xconn->smb2.send_queue_len--;
                DLIST_REMOVE(xconn->smb2.send_queue, e);
-               talloc_free(e->mem_ctx);
+
+               if (e->ack.req == NULL) {
+                       talloc_free(e->mem_ctx);
+                       continue;
+               }
+
+               e->ack.required_acked_bytes = xconn->ack.unacked_bytes;
+               DLIST_ADD_END(xconn->ack.queue, e);
        }
 
        /*