r6561: re-did the internal message system based on DGRAM unix domain
authorAndrew Tridgell <tridge@samba.org>
Sun, 1 May 2005 18:49:07 +0000 (18:49 +0000)
committerGerald (Jerry) Carter <jerry@samba.org>
Wed, 10 Oct 2007 18:16:25 +0000 (13:16 -0500)
sockets. This gains us about 40% in messaging speed.
(This used to be commit f244a64ed537447e44229172427b5b6a5c64800c)

source4/lib/messaging/messaging.c

index 8127e7e8fc32f272ea38840c2564e9d6e39d1f99..c95028bea5e0a84c7a50e08e0481656671ccb278 100644 (file)
 /* the number of microseconds to backoff in retrying to send a message */
 #define MESSAGING_BACKOFF 250000
 
+/* maximum message size */
+#define MESSAGING_MAX_SIZE 512
+
 struct messaging_context {
        uint32_t server_id;
        struct socket_context *sock;
-       char *path;
+       const char *path;
        struct dispatch_fn *dispatch;
 
        struct {
@@ -60,7 +63,6 @@ struct dispatch_fn {
 struct messaging_rec {
        struct messaging_context *msg;
        struct socket_context *sock;
-       struct fd_event *fde;
        const char *path;
 
        struct {
@@ -72,8 +74,6 @@ struct messaging_rec {
        } header;
 
        DATA_BLOB data;
-
-       uint32_t ndone;
 };
 
 /*
@@ -112,112 +112,58 @@ static void messaging_dispatch(struct messaging_context *msg, struct messaging_r
                }
        }
 
-       /* we don't free the record itself here as there may
-          be more messages from this client */
-       data_blob_free(&rec->data);
        rec->header.length = 0;
-       rec->ndone = 0;
 }
 
 
 /*
-  handle IO for a single message
+  handle a new incoming connection
 */
 static void messaging_recv_handler(struct event_context *ev, struct fd_event *fde, 
                                   uint16_t flags, void *private)
 {
-       struct messaging_rec *rec = talloc_get_type(private, struct messaging_rec);
-       struct messaging_context *msg = rec->msg;
+       struct messaging_context *msg = talloc_get_type(private, 
+                                                       struct messaging_context);
+       struct messaging_rec *rec;
        NTSTATUS status;
+       uint8_t data[MESSAGING_MAX_SIZE];
+       size_t msize;
 
-       if (rec->ndone < sizeof(rec->header)) {
-               /* receive the header */
-               size_t nread;
-
-               status = socket_recv(rec->sock, 
-                                    rec->ndone + (char *)&rec->header,
-                                    sizeof(rec->header) - rec->ndone, &nread, 0);
-               if (NT_STATUS_IS_ERR(status)) {
-                       talloc_free(rec);
-                       return;
-               }
-
-               if (nread == 0) {
-                       return;
-               }
-
-               rec->ndone += nread;
-
-               if (rec->ndone == sizeof(rec->header)) {
-                       if (rec->header.version != MESSAGING_VERSION) {
-                               DEBUG(0,("meessage with wrong version %u\n",
-                                        rec->header.version));
-                               talloc_free(rec);
-                       }
-                       rec->data = data_blob_talloc(rec, NULL, rec->header.length);
-                       if (rec->data.length != rec->header.length) {
-                               DEBUG(0,("Unable to allocate message of size %u\n",
-                                        rec->header.length));
-                               talloc_free(rec);
-                       }
-               }
-       }
-
-       if (rec->ndone >= sizeof(rec->header) && 
-           rec->ndone < sizeof(rec->header) + rec->header.length) {
-               /* receive the body, if any */
-               size_t nread;
-
-               status = socket_recv(rec->sock, 
-                                    rec->data.data + (rec->ndone - sizeof(rec->header)),
-                                    sizeof(rec->header) + rec->header.length - rec->ndone, 
-                                    &nread, 0);
-               if (NT_STATUS_IS_ERR(status)) {
-                       talloc_free(rec);
-                       return;
-               }
-
-               if (nread == 0) {
-                       return;
-               }
-
-               rec->ndone += nread;
+       status = socket_recv(msg->sock, data, sizeof(data), &msize, 0);
+       if (!NT_STATUS_IS_OK(status)) {
+               return;
        }
 
-       if (rec->ndone == sizeof(rec->header) + rec->header.length) {
-               /* we've got the whole message */
-               messaging_dispatch(msg, rec);
+       if (msize < sizeof(rec->header)) {
+               DEBUG(0,("messaging: bad message of size %d\n", msize));
+               return;
        }
-}
-
-/*
-  handle a new incoming connection
-*/
-static void messaging_listen_handler(struct event_context *ev, struct fd_event *fde, 
-                                    uint16_t flags, void *private)
-{
-       struct messaging_context *msg = talloc_get_type(private, 
-                                                       struct messaging_context);
-       struct messaging_rec *rec;
-       NTSTATUS status;
 
        rec = talloc(msg, struct messaging_rec);
        if (rec == NULL) {
                smb_panic("Unable to allocate messaging_rec");
        }
 
-       status = socket_accept(msg->sock, &rec->sock);
-       if (!NT_STATUS_IS_OK(status)) {
-               smb_panic("Unable to accept messaging_rec");
-       }
-       talloc_steal(rec, rec->sock);
-
        rec->msg           = msg;
-       rec->ndone         = 0;
-       rec->header.length = 0;
        rec->path          = msg->path;
-       rec->fde           = event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock), 
-                                         EVENT_FD_READ, messaging_recv_handler, rec);
+       rec->sock          = NULL;
+
+       memcpy(&rec->header, data, sizeof(rec->header));
+       if (msize != sizeof(rec->header) + rec->header.length) {
+               DEBUG(0,("messaging: bad message header size %d should be %d\n", 
+                        rec->header.length, msize - sizeof(rec->header)));
+               talloc_free(rec);
+               return;
+       }
+
+       rec->data = data_blob_talloc(rec, data, rec->header.length);
+       if (rec->data.data == NULL) {
+               talloc_free(rec);
+               return;
+       }
+
+       messaging_dispatch(msg, rec);
+       talloc_free(rec);
 }
 
 /*
@@ -262,49 +208,28 @@ static void messaging_send_handler(struct event_context *ev, struct fd_event *fd
                                   uint16_t flags, void *private)
 {
        struct messaging_rec *rec = talloc_get_type(private, struct messaging_rec);
+       uint8_t data[MESSAGING_MAX_SIZE];
+       DATA_BLOB blob;
+       size_t nsent;
        NTSTATUS status;
 
-       if (rec->ndone < sizeof(rec->header)) {
-               /* send the header */
-               size_t nsent;
-               DATA_BLOB blob;
-
-               blob.data = rec->ndone + (uint8_t *)&rec->header;
-               blob.length = sizeof(rec->header) - rec->ndone;
-
-               status = socket_send(rec->sock, &blob, &nsent, 0);
-               if (NT_STATUS_IS_ERR(status)) {
-                       talloc_free(rec);
-                       return;
-               }
-
-               if (nsent == 0) {
-                       return;
-               }
-
-               rec->ndone += nsent;
-       }
-
-       if (rec->ndone >= sizeof(rec->header) && 
-           rec->ndone < sizeof(rec->header) + rec->header.length) {
-               /* send the body, if any */
-               DATA_BLOB blob;
-               size_t nsent;
-
-               blob.data = rec->data.data + (rec->ndone - sizeof(rec->header));
-               blob.length = rec->header.length - (rec->ndone - sizeof(rec->header));
+       memcpy(data, &rec->header, sizeof(rec->header));
+       memcpy(data + sizeof(rec->header), rec->data.data, rec->data.length);
 
-               status = socket_send(rec->sock, &blob, &nsent, 0);
-               if (NT_STATUS_IS_ERR(status)) {
-                       talloc_free(rec);
-                       return;
-               }
+       blob.data = data;
+       blob.length = sizeof(rec->header) + rec->header.length;
 
-               rec->ndone += nsent;
+       status = socket_send(rec->sock, &blob, &nsent, 0);
+       if (NT_STATUS_IS_ERR(status)) {
+               DEBUG(3,("Unable to send message of type %d length %d - %s\n", 
+                        rec->header.msg_type, 
+                        rec->header.length, 
+                        nt_errstr(status)));
+               talloc_free(rec);
+               return;
        }
 
-       if (rec->ndone == sizeof(rec->header) + rec->header.length) {
-               /* we've done the whole message */
+       if (NT_STATUS_IS_OK(status)) {
                talloc_free(rec);
        }
 }
@@ -349,8 +274,8 @@ static void messaging_backoff_handler(struct event_context *ev, struct timed_eve
                return;
        }
 
-       rec->fde = event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock),
-                               EVENT_FD_WRITE, messaging_send_handler, rec);
+       event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock),
+                    EVENT_FD_WRITE, messaging_send_handler, rec);
 }
 
 
@@ -378,9 +303,8 @@ NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, uint32_t
        } else {
                rec->data = data_blob(NULL, 0);
        }
-       rec->ndone = 0;
 
-       status = socket_create("unix", SOCKET_TYPE_STREAM, &rec->sock, 0);
+       status = socket_create("unix", SOCKET_TYPE_DGRAM, &rec->sock, 0);
        if (!NT_STATUS_IS_OK(status)) {
                talloc_free(rec);
                return status;
@@ -403,8 +327,8 @@ NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, uint32_t
                return status;
        }
 
-       rec->fde = event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock),
-                               EVENT_FD_WRITE, messaging_send_handler, rec);
+       event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock),
+                    EVENT_FD_WRITE, messaging_send_handler, rec);
 
        return NT_STATUS_OK;
 }
@@ -437,10 +361,12 @@ static int messaging_destructor(void *ptr)
 /*
   create the listening socket and setup the dispatcher
 */
-struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id, struct event_context *ev)
+struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id, 
+                                        struct event_context *ev)
 {
        struct messaging_context *msg;
        NTSTATUS status;
+       char *path;
 
        msg = talloc(mem_ctx, struct messaging_context);
        if (msg == NULL) {
@@ -448,15 +374,15 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id
        }
 
        /* create the messaging directory if needed */
-       msg->path = smbd_tmp_path(msg, "messaging");
-       mkdir(msg->path, 0700);
-       talloc_free(msg->path);
+       path = smbd_tmp_path(msg, "messaging");
+       mkdir(path, 0700);
+       talloc_free(path);
 
+       msg->path = messaging_path(msg, server_id);
        msg->server_id = server_id;
        msg->dispatch = NULL;
-       msg->path = messaging_path(msg, server_id);
 
-       status = socket_create("unix", SOCKET_TYPE_STREAM, &msg->sock, 0);
+       status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0);
        if (!NT_STATUS_IS_OK(status)) {
                talloc_free(msg);
                return NULL;
@@ -475,7 +401,7 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id
 
        msg->event.ev   = talloc_reference(msg, ev);
        msg->event.fde  = event_add_fd(ev, msg, socket_get_fd(msg->sock), 
-                                      EVENT_FD_READ, messaging_listen_handler, msg);
+                                      EVENT_FD_READ, messaging_recv_handler, msg);
 
        talloc_set_destructor(msg, messaging_destructor);