First part of efficiency fixes for message sending to pid's (cutting down
authorJeremy Allison <jra@samba.org>
Fri, 10 Jan 2003 20:17:02 +0000 (20:17 +0000)
committerJeremy Allison <jra@samba.org>
Fri, 10 Jan 2003 20:17:02 +0000 (20:17 +0000)
the amount of time we hold tdb locks). Gulp down all messages at once rather
than reading/re-writing one at a time. NOTE: All dispatch routines *must*
be able to cope with incoming message on *odd* byte boundaries (all current
handlers do).
Jeremy.
(This used to be commit b752c0340f96669b2b2573cf7d3d10f99580b070)

source3/lib/messages.c

index 8200b2f8c30c1ae3edfc52650344f0c729738b41..555a55569e4f963686c2b83228ccf0d87262f4f6 100644 (file)
@@ -3,6 +3,7 @@
    Samba internal messaging functions
    Copyright (C) Andrew Tridgell 2000
    Copyright (C) 2001 by Martin Pool
+   Copyright (C) 2002 by Jeremy Allison
    
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -35,6 +36,9 @@
    use that to reply by message_send_pid().  See ping_message() for a
    simple example.
 
+   *NOTE*: Dispatch functions must be able to cope with incoming
+   messages on an *odd* byte boundary.
+
    This system doesn't have any inherent size limitations but is not
    very efficient for large messages or when messages are sent in very
    quick succession.
@@ -86,6 +90,16 @@ static void ping_message(int msg_type, pid_t src, void *buf, size_t len)
        message_send_pid(src, MSG_PONG, buf, len, True);
 }
 
+/****************************************************************************
+ Return current debug level.
+****************************************************************************/
+
+void debuglevel_message(int msg_type, pid_t src, void *buf, size_t len)
+{
+       DEBUG(1,("INFO: Received REQ_DEBUGLEVEL message from PID %u\n",(unsigned int)src));
+       message_send_pid(src, MSG_DEBUGLEVEL, DEBUGLEVEL_CLASS, sizeof(DEBUGLEVEL_CLASS), True);
+}
+
 /****************************************************************************
  Initialise the messaging functions. 
 ****************************************************************************/
@@ -106,6 +120,7 @@ BOOL message_init(void)
        CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
 
        message_register(MSG_PING, ping_message);
+       message_register(MSG_REQ_DEBUGLEVEL, debuglevel_message);
 
        return True;
 }
@@ -133,9 +148,13 @@ static TDB_DATA message_key_pid(pid_t pid)
 
 static BOOL message_notify(pid_t pid)
 {
-       /* Doing kill with a non-positive pid causes messages to be
-        * sent to places we don't want. */
+       /*
+        * Doing kill with a non-positive pid causes messages to be
+        * sent to places we don't want.
+        */
+
        SMB_ASSERT(pid > 0);
+
        if (kill(pid, SIGUSR1) == -1) {
                if (errno == ESRCH) {
                        DEBUG(2,("pid %d doesn't exist - deleting messages record\n", (int)pid));
@@ -160,16 +179,19 @@ BOOL message_send_pid(pid_t pid, int msg_type, const void *buf, size_t len,
        struct message_rec rec;
        void *p;
 
+       /*
+        * Doing kill with a non-positive pid causes messages to be
+        * sent to places we don't want.
+        */
+
+       SMB_ASSERT(pid > 0);
+
        rec.msg_version = MESSAGE_VERSION;
        rec.msg_type = msg_type;
        rec.dest = pid;
        rec.src = sys_getpid();
        rec.len = len;
 
-       /* Doing kill with a non-positive pid causes messages to be
-        * sent to places we don't want. */
-       SMB_ASSERT(pid > 0);
-
        kbuf = message_key_pid(pid);
 
        /* lock the record for the destination */
@@ -241,111 +263,136 @@ BOOL message_send_pid(pid_t pid, int msg_type, const void *buf, size_t len,
 
  failed:
        tdb_chainunlock(tdb, kbuf);
+       SAFE_FREE(dbuf.dptr);
        errno = 0;                    /* paranoia */
        return False;
 }
 
 /****************************************************************************
- Retrieve the next message for the current process.
+ Retrieve all messages for the current process.
 ****************************************************************************/
 
-static BOOL message_recv(int *msg_type, pid_t *src, void **buf, size_t *len)
+static BOOL retrieve_all_messages(char **msgs_buf, size_t *total_len)
 {
        TDB_DATA kbuf;
        TDB_DATA dbuf;
-       struct message_rec rec;
+       TDB_DATA null_dbuf;
+
+       ZERO_STRUCT(null_dbuf);
+
+       *msgs_buf = NULL;
+       *total_len = 0;
 
        kbuf = message_key_pid(sys_getpid());
 
        tdb_chainlock(tdb, kbuf);
-       
        dbuf = tdb_fetch(tdb, kbuf);
-       if (dbuf.dptr == NULL || dbuf.dsize == 0)
-               goto failed;
+       /*
+        * Replace with an empty record to keep the allocated
+        * space in the tdb.
+        */
+       tdb_store(tdb, kbuf, null_dbuf, TDB_REPLACE);
+       tdb_chainunlock(tdb, kbuf);
+
+       if (dbuf.dptr == NULL || dbuf.dsize == 0) {
+               SAFE_FREE(dbuf.dptr);
+               return False;
+       }
+
+       *msgs_buf = dbuf.dptr;
+       *total_len = dbuf.dsize;
+
+       return True;
+}
+
+/****************************************************************************
+ Parse out the next message for the current process.
+****************************************************************************/
+
+static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type, pid_t *src, char **buf, size_t *len)
+{
+       struct message_rec rec;
+       char *ret_buf = *buf;
 
-       memcpy(&rec, dbuf.dptr, sizeof(rec));
+       *buf = NULL;
+       *len = 0;
+
+       if (total_len - (ret_buf - msgs_buf) < sizeof(rec))
+               return False;
+
+       memcpy(&rec, ret_buf, sizeof(rec));
+       ret_buf += sizeof(rec);
 
        if (rec.msg_version != MESSAGE_VERSION) {
                DEBUG(0,("message version %d received (expected %d)\n", rec.msg_version, MESSAGE_VERSION));
-               goto failed;
+               return False;
        }
 
        if (rec.len > 0) {
-               (*buf) = (void *)malloc(rec.len);
-               if (!(*buf))
-                       goto failed;
-
-               memcpy(*buf, dbuf.dptr+sizeof(rec), rec.len);
-       } else {
-               *buf = NULL;
+               if (total_len - (ret_buf - msgs_buf) < rec.len)
+                       return False;
        }
 
        *len = rec.len;
        *msg_type = rec.msg_type;
        *src = rec.src;
+       *buf = ret_buf;
 
-       if (dbuf.dsize - (sizeof(rec)+rec.len) > 0)
-               memmove(dbuf.dptr, dbuf.dptr+sizeof(rec)+rec.len, dbuf.dsize - (sizeof(rec)+rec.len));
-       dbuf.dsize -= sizeof(rec)+rec.len;
-
-       if (dbuf.dsize == 0)
-               tdb_delete(tdb, kbuf);
-       else
-               tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
-
-       SAFE_FREE(dbuf.dptr);
-       tdb_chainunlock(tdb, kbuf);
        return True;
-
- failed:
-       tdb_chainunlock(tdb, kbuf);
-       SAFE_FREE(dbuf.dptr);
-       return False;
 }
 
 /****************************************************************************
  Receive and dispatch any messages pending for this process.
  Notice that all dispatch handlers for a particular msg_type get called,
  so you can register multiple handlers for a message.
+ *NOTE*: Dispatch functions must be able to cope with incoming
+ messages on an *odd* byte boundary.
 ****************************************************************************/
 
 void message_dispatch(void)
 {
        int msg_type;
        pid_t src;
-       void *buf;
-       size_t len;
+       char *buf;
+       char *msgs_buf;
+       size_t len, total_len;
        struct dispatch_fns *dfn;
        int n_handled;
 
-       if (!received_signal) return;
+       if (!received_signal)
+               return;
 
        DEBUG(10,("message_dispatch: received_signal = %d\n", received_signal));
 
        received_signal = 0;
 
-       while (message_recv(&msg_type, &src, &buf, &len)) {
-               DEBUG(10,("message_dispatch: received msg_type=%d src_pid=%d\n",
-                         msg_type, (int) src));
+       if (!retrieve_all_messages(&msgs_buf, &total_len))
+               return;
+
+       for (buf = msgs_buf; message_recv(msgs_buf, total_len, &msg_type, &src, &buf, &len); buf += len) {
+               DEBUG(10,("message_dispatch: received msg_type=%d src_pid=%u\n",
+                         msg_type, (unsigned int) src));
                n_handled = 0;
                for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
                        if (dfn->msg_type == msg_type) {
                                DEBUG(10,("message_dispatch: processing message of type %d.\n", msg_type));
-                               dfn->fn(msg_type, src, buf, len);
+                               dfn->fn(msg_type, src, len ? (void *)buf : NULL, len);
                                n_handled++;
                        }
                }
                if (!n_handled) {
-                       DEBUG(5,("message_dispatch: warning: no handlers registered for "
-                                "msg_type %d in pid %d\n",
-                                msg_type, sys_getpid()));
+                       DEBUG(5,("message_dispatch: warning: no handlers registed for "
+                                "msg_type %d in pid %u\n",
+                                msg_type, (unsigned int)sys_getpid()));
                }
-               SAFE_FREE(buf);
        }
+       SAFE_FREE(msgs_buf);
 }
 
 /****************************************************************************
  Register a dispatch function for a particular message type.
+ *NOTE*: Dispatch functions must be able to cope with incoming
+ messages on an *odd* byte boundary.
 ****************************************************************************/
 
 void message_register(int msg_type,