Use the proper boolean constants.
[bbaumbach/samba-autobuild/.git] / source / lib / messages.c
index 8602c2f32dc012b3ca7a5281d74d0e6aaf5da192..5cd575466fae33aeff86432227f2cf126df3e6a5 100644 (file)
@@ -3,10 +3,12 @@
    Samba internal messaging functions
    Copyright (C) Andrew Tridgell 2000
    Copyright (C) 2001 by Martin Pool
+   Copyright (C) 2002 by Jeremy Allison
+   Copyright (C) 2007 by Volker Lendecke
    
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
+   the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.
    
    This program is distributed in the hope that it will be useful,
    GNU General Public License for more details.
    
    You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 /**
-   @defgroups messages Internal messaging framework
-   @{
-   @file messages.c
-
-   This module is used for internal messaging between Samba daemons. 
+  @defgroup messages Internal messaging framework
+  @{
+  @file messages.c
+  
+  @brief  Module for internal messaging between Samba daemons. 
 
    The idea is that if a part of Samba wants to do communication with
    another Samba process then it will do a message_register() of a
@@ -35,6 +36,9 @@
    use that to reply by message_send_pid().  See ping_message() for a
    simple example.
 
+   @caution Dispatch functions must be able to cope with incoming
+   messages on an *odd* byte boundary.
+
    This system doesn't have any inherent size limitations but is not
    very efficient for large messages or when messages are sent in very
    quick succession.
 */
 
 #include "includes.h"
-
-/* the locking database handle */
-static TDB_CONTEXT *tdb;
-static int received_signal;
-
-/* change the message version with any incompatible changes in the protocol */
-#define MESSAGE_VERSION 1
-
-struct message_rec {
-       int msg_version;
-       int msg_type;
-       pid_t dest;
-       pid_t src;
-       size_t len;
+#include "librpc/gen_ndr/messaging.h"
+#include "librpc/gen_ndr/ndr_messaging.h"
+
+struct messaging_callback {
+       struct messaging_callback *prev, *next;
+       uint32 msg_type;
+       void (*fn)(struct messaging_context *msg, void *private_data, 
+                  uint32_t msg_type, 
+                  struct server_id server_id, DATA_BLOB *data);
+       void *private_data;
 };
 
-/* we have a linked list of dispatch handlers */
-static struct dispatch_fns {
-       struct dispatch_fns *next, *prev;
-       int msg_type;
-       void (*fn)(int msg_type, pid_t pid, void *buf, size_t len);
-} *dispatch_fns;
-
-/****************************************************************************
- Notifications come in as signals.
-****************************************************************************/
-
-static void sig_usr1(void)
-{
-       received_signal = 1;
-       sys_select_signal();
-}
-
 /****************************************************************************
  A useful function for testing the message system.
 ****************************************************************************/
 
-void ping_message(int msg_type, pid_t src, void *buf, size_t len)
-{
-       char *msg = buf ? buf : "none";
-       DEBUG(1,("INFO: Received PING message from PID %u [%s]\n",(unsigned int)src, msg));
-       message_send_pid(src, MSG_PONG, buf, len, True);
-}
-
-/****************************************************************************
- Return current debug level.
-****************************************************************************/
-
-void debuglevel_message(int msg_type, pid_t src, void *buf, size_t len)
-{
-       DEBUG(1,("INFO: Received REQ_DEBUGLEVEL message from PID %u\n",(unsigned int)src));
-       message_send_pid(src, MSG_DEBUGLEVEL, DEBUGLEVEL_CLASS, sizeof(DEBUGLEVEL_CLASS), True);
-}
-
-/****************************************************************************
- Initialise the messaging functions. 
-****************************************************************************/
-
-BOOL message_init(void)
-{
-       if (tdb) return True;
-
-       tdb = tdb_open_log(lock_path("messages.tdb"), 
-                      0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, 
-                      O_RDWR|O_CREAT,0600);
-
-       if (!tdb) {
-               DEBUG(0,("ERROR: Failed to initialise messages database\n"));
-               return False;
-       }
-
-       CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
-
-       message_register(MSG_PING, ping_message);
-       message_register(MSG_REQ_DEBUGLEVEL, debuglevel_message);
-
-       return True;
-}
-
-/*******************************************************************
- Form a static tdb key from a pid.
-******************************************************************/
-
-static TDB_DATA message_key_pid(pid_t pid)
-{
-       static char key[20];
-       TDB_DATA kbuf;
-
-       slprintf(key, sizeof(key)-1, "PID/%d", (int)pid);
-       
-       kbuf.dptr = (char *)key;
-       kbuf.dsize = strlen(key)+1;
-       return kbuf;
-}
-
-/****************************************************************************
- Notify a process that it has a message. If the process doesn't exist 
- then delete its record in the database.
-****************************************************************************/
-
-static BOOL message_notify(pid_t pid)
-{
-       /* Doing kill with a non-positive pid causes messages to be
-        * sent to places we don't want. */
-       SMB_ASSERT(pid > 0);
-       if (kill(pid, SIGUSR1) == -1) {
-               if (errno == ESRCH) {
-                       DEBUG(2,("pid %d doesn't exist - deleting messages record\n", (int)pid));
-                       tdb_delete(tdb, message_key_pid(pid));
-               } else {
-                       DEBUG(2,("message to process %d failed - %s\n", (int)pid, strerror(errno)));
-               }
-               return False;
-       }
-       return True;
-}
-
-/****************************************************************************
- Send a message to a particular pid.
-****************************************************************************/
-
-BOOL message_send_pid(pid_t pid, int msg_type, const void *buf, size_t len,
-                     BOOL duplicates_allowed)
-{
-       TDB_DATA kbuf;
-       TDB_DATA dbuf;
-       struct message_rec rec;
-       void *p;
-
-       rec.msg_version = MESSAGE_VERSION;
-       rec.msg_type = msg_type;
-       rec.dest = pid;
-       rec.src = sys_getpid();
-       rec.len = len;
-
-       /* Doing kill with a non-positive pid causes messages to be
-        * sent to places we don't want. */
-       SMB_ASSERT(pid > 0);
-
-       kbuf = message_key_pid(pid);
-
-       /* lock the record for the destination */
-       tdb_chainlock(tdb, kbuf);
-
-       dbuf = tdb_fetch(tdb, kbuf);
-
-       if (!dbuf.dptr) {
-               /* its a new record */
-               p = (void *)malloc(len + sizeof(rec));
-               if (!p) goto failed;
-
-               memcpy(p, &rec, sizeof(rec));
-               if (len > 0) memcpy((void *)((char*)p+sizeof(rec)), buf, len);
-
-               dbuf.dptr = p;
-               dbuf.dsize = len + sizeof(rec);
-               tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
-               SAFE_FREE(p);
-               goto ok;
-       }
-
-       if (!duplicates_allowed) {
-               char *ptr;
-               struct message_rec prec;
-               
-               for(ptr = (char *)dbuf.dptr; ptr < dbuf.dptr + dbuf.dsize; ) {
-                       /*
-                        * First check if the message header matches, then, if it's a non-zero
-                        * sized message, check if the data matches. If so it's a duplicate and
-                        * we can discard it. JRA.
-                        */
-
-                       if (!memcmp(ptr, &rec, sizeof(rec))) {
-                               if (!len || (len && !memcmp( ptr + sizeof(rec), buf, len))) {
-                                       DEBUG(10,("message_send_pid: discarding duplicate message.\n"));
-                                       SAFE_FREE(dbuf.dptr);
-                                       tdb_chainunlock(tdb, kbuf);
-                                       return True;
-                               }
-                       }
-                       memcpy(&prec, ptr, sizeof(prec));
-                       ptr += sizeof(rec) + prec.len;
-               }
-       }
-
-       /* we're adding to an existing entry */
-       p = (void *)malloc(dbuf.dsize + len + sizeof(rec));
-       if (!p) goto failed;
-
-       memcpy(p, dbuf.dptr, dbuf.dsize);
-       memcpy((void *)((char*)p+dbuf.dsize), &rec, sizeof(rec));
-       if (len > 0) memcpy((void *)((char*)p+dbuf.dsize+sizeof(rec)), buf, len);
-
-       SAFE_FREE(dbuf.dptr);
-       dbuf.dptr = p;
-       dbuf.dsize += len + sizeof(rec);
-       tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
-       SAFE_FREE(dbuf.dptr);
-
- ok:
-       tdb_chainunlock(tdb, kbuf);
-       errno = 0;                    /* paranoia */
-       return message_notify(pid);
-
- failed:
-       tdb_chainunlock(tdb, kbuf);
-       errno = 0;                    /* paranoia */
-       return False;
-}
-
-/****************************************************************************
- Retrieve the next message for the current process.
-****************************************************************************/
-
-static BOOL message_recv(int *msg_type, pid_t *src, void **buf, size_t *len)
-{
-       TDB_DATA kbuf;
-       TDB_DATA dbuf;
-       struct message_rec rec;
-
-       kbuf = message_key_pid(sys_getpid());
-
-       tdb_chainlock(tdb, kbuf);
-       
-       dbuf = tdb_fetch(tdb, kbuf);
-       if (dbuf.dptr == NULL || dbuf.dsize == 0) goto failed;
-
-       memcpy(&rec, dbuf.dptr, sizeof(rec));
-
-       if (rec.msg_version != MESSAGE_VERSION) {
-               DEBUG(0,("message version %d received (expected %d)\n", rec.msg_version, MESSAGE_VERSION));
-               goto failed;
-       }
-
-       if (rec.len > 0) {
-               (*buf) = (void *)malloc(rec.len);
-               if (!(*buf)) goto failed;
-
-               memcpy(*buf, dbuf.dptr+sizeof(rec), rec.len);
-       } else {
-               *buf = NULL;
-       }
-
-       *len = rec.len;
-       *msg_type = rec.msg_type;
-       *src = rec.src;
-
-       if (dbuf.dsize - (sizeof(rec)+rec.len) > 0)
-               memmove(dbuf.dptr, dbuf.dptr+sizeof(rec)+rec.len, dbuf.dsize - (sizeof(rec)+rec.len));
-       dbuf.dsize -= sizeof(rec)+rec.len;
-
-       if (dbuf.dsize == 0)
-               tdb_delete(tdb, kbuf);
-       else
-               tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
-
-       SAFE_FREE(dbuf.dptr);
-       tdb_chainunlock(tdb, kbuf);
-       return True;
-
- failed:
-       tdb_chainunlock(tdb, kbuf);
-       return False;
-}
-
-/****************************************************************************
- Receive and dispatch any messages pending for this process.
- Notice that all dispatch handlers for a particular msg_type get called,
- so you can register multiple handlers for a message.
-****************************************************************************/
-
-void message_dispatch(void)
+static void ping_message(struct messaging_context *msg_ctx,
+                        void *private_data,
+                        uint32_t msg_type,
+                        struct server_id src,
+                        DATA_BLOB *data)
 {
-       int msg_type;
-       pid_t src;
-       void *buf;
-       size_t len;
-       struct dispatch_fns *dfn;
-       int n_handled;
-
-       if (!received_signal) return;
+       const char *msg = data->data ? (const char *)data->data : "none";
 
-       DEBUG(10,("message_dispatch: received_signal = %d\n", received_signal));
-
-       received_signal = 0;
-
-       while (message_recv(&msg_type, &src, &buf, &len)) {
-               DEBUG(10,("message_dispatch: received msg_type=%d src_pid=%d\n",
-                         msg_type, (int) src));
-               n_handled = 0;
-               for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
-                       if (dfn->msg_type == msg_type) {
-                               DEBUG(10,("message_dispatch: processing message of type %d.\n", msg_type));
-                               dfn->fn(msg_type, src, buf, len);
-                               n_handled++;
-                       }
-               }
-               if (!n_handled) {
-                       DEBUG(5,("message_dispatch: warning: no handlers registed for "
-                                "msg_type %d in pid%d\n",
-                                msg_type, sys_getpid()));
-               }
-               SAFE_FREE(buf);
-       }
+       DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
+                procid_str_static(&src), msg));
+       messaging_send(msg_ctx, src, MSG_PONG, data);
 }
 
 /****************************************************************************
- Register a dispatch function for a particular message type.
+ Register/replace a dispatch function for a particular message type.
+ JRA changed Dec 13 2006. Only one message handler now permitted per type.
+ *NOTE*: Dispatch functions must be able to cope with incoming
+ messages on an *odd* byte boundary.
 ****************************************************************************/
 
-void message_register(int msg_type, 
-                     void (*fn)(int msg_type, pid_t pid, void *buf, size_t len))
-{
-       struct dispatch_fns *dfn;
-
-       dfn = (struct dispatch_fns *)malloc(sizeof(*dfn));
-
-       if (dfn != NULL) {
-
-               ZERO_STRUCTPN(dfn);
-
-               dfn->msg_type = msg_type;
-               dfn->fn = fn;
-
-               DLIST_ADD(dispatch_fns, dfn);
-       }
-       else {
-       
-               DEBUG(0,("message_register: Not enough memory. malloc failed!\n"));
-       }
-}
-
-/****************************************************************************
- De-register the function for a particular message type.
-****************************************************************************/
-
-void message_deregister(int msg_type)
-{
-       struct dispatch_fns *dfn, *next;
-
-       for (dfn = dispatch_fns; dfn; dfn = next) {
-               next = dfn->next;
-               if (dfn->msg_type == msg_type) {
-                       DLIST_REMOVE(dispatch_fns, dfn);
-                       SAFE_FREE(dfn);
-               }
-       }       
-}
-
 struct msg_all {
+       struct messaging_context *msg_ctx;
        int msg_type;
+       uint32 msg_flag;
        const void *buf;
        size_t len;
-       BOOL duplicates;
-       int             n_sent;
+       int n_sent;
 };
 
 /****************************************************************************
  Send one of the messages for the broadcast.
 ****************************************************************************/
 
-static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void *state)
+static int traverse_fn(struct db_record *rec,
+                      const struct connections_key *ckey,
+                      const struct connections_data *crec,
+                      void *state)
 {
-       struct connections_data crec;
        struct msg_all *msg_all = (struct msg_all *)state;
+       NTSTATUS status;
 
-       if (dbuf.dsize != sizeof(crec))
+       if (crec->cnum != -1)
                return 0;
 
-       memcpy(&crec, dbuf.dptr, sizeof(crec));
+       /* Don't send if the receiver hasn't registered an interest. */
 
-       if (crec.cnum != -1)
+       if(!(crec->bcast_msg_flags & msg_all->msg_flag))
                return 0;
 
-       /* if the msg send fails because the pid was not found (i.e. smbd died), 
+       /* If the msg send fails because the pid was not found (i.e. smbd died), 
         * the msg has already been deleted from the messages.tdb.*/
-       if (!message_send_pid(crec.pid, msg_all->msg_type,
-                             msg_all->buf, msg_all->len,
-                             msg_all->duplicates)) {
+
+       status = messaging_send_buf(msg_all->msg_ctx,
+                                   crec->pid, msg_all->msg_type,
+                                   (uint8 *)msg_all->buf, msg_all->len);
+
+       if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
                
-               /* if the pid was not found delete the entry from connections.tdb */
-               if (errno == ESRCH) {
-                       DEBUG(2,("pid %u doesn't exist - deleting connections %d [%s]\n",
-                                       (unsigned int)crec.pid, crec.cnum, crec.name));
-                       tdb_delete(the_tdb, kbuf);
-               }
+               /* If the pid was not found delete the entry from connections.tdb */
+
+               DEBUG(2,("pid %s doesn't exist - deleting connections %d [%s]\n",
+                        procid_str_static(&crec->pid), crec->cnum,
+                        crec->servicename));
+
+               rec->delete_rec(rec);
        }
        msg_all->n_sent++;
        return 0;
@@ -443,92 +142,220 @@ static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void
  * @param n_sent Set to the number of messages sent.  This should be
  * equal to the number of processes, but be careful for races.
  *
- * @return True for success.
+ * @retval True for success.
  **/
-BOOL message_send_all(TDB_CONTEXT *conn_tdb, int msg_type,
+bool message_send_all(struct messaging_context *msg_ctx,
+                     int msg_type,
                      const void *buf, size_t len,
-                     BOOL duplicates_allowed,
                      int *n_sent)
 {
        struct msg_all msg_all;
 
        msg_all.msg_type = msg_type;
+       if (msg_type < 1000)
+               msg_all.msg_flag = FLAG_MSG_GENERAL;
+       else if (msg_type > 1000 && msg_type < 2000)
+               msg_all.msg_flag = FLAG_MSG_NMBD;
+       else if (msg_type > 2000 && msg_type < 2100)
+               msg_all.msg_flag = FLAG_MSG_PRINT_NOTIFY;
+       else if (msg_type > 2100 && msg_type < 3000)
+               msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
+       else if (msg_type > 3000 && msg_type < 4000)
+               msg_all.msg_flag = FLAG_MSG_SMBD;
+       else
+               return False;
+
        msg_all.buf = buf;
        msg_all.len = len;
-       msg_all.duplicates = duplicates_allowed;
        msg_all.n_sent = 0;
+       msg_all.msg_ctx = msg_ctx;
 
-       tdb_traverse(conn_tdb, traverse_fn, &msg_all);
+       connections_forall(traverse_fn, &msg_all);
        if (n_sent)
                *n_sent = msg_all.n_sent;
        return True;
 }
 
-static VOLATILE sig_atomic_t gotalarm;
-
-/***************************************************************
- Signal function to tell us we timed out.
-****************************************************************/
+struct event_context *messaging_event_context(struct messaging_context *msg_ctx)
+{
+       return msg_ctx->event_ctx;
+}
 
-static void gotalarm_sig(void)
+struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
+                                        struct server_id server_id, 
+                                        struct event_context *ev)
 {
-       gotalarm = 1;
+       struct messaging_context *ctx;
+       NTSTATUS status;
+
+       if (!(ctx = TALLOC_ZERO_P(mem_ctx, struct messaging_context))) {
+               return NULL;
+       }
+
+       ctx->id = server_id;
+       ctx->event_ctx = ev;
+
+       status = messaging_tdb_init(ctx, ctx, &ctx->local);
+
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(0, ("messaging_tdb_init failed: %s\n",
+                         nt_errstr(status)));
+               TALLOC_FREE(ctx);
+               return NULL;
+       }
+
+#ifdef CLUSTER_SUPPORT
+       if (lp_clustering()) {
+               status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
+
+               if (!NT_STATUS_IS_OK(status)) {
+                       DEBUG(0, ("messaging_ctdb_init failed: %s\n",
+                                 nt_errstr(status)));
+                       TALLOC_FREE(ctx);
+                       return NULL;
+               }
+       }
+#endif
+
+       messaging_register(ctx, NULL, MSG_PING, ping_message);
+
+       /* Register some debugging related messages */
+
+       register_msg_pool_usage(ctx);
+       register_dmalloc_msgs(ctx);
+       debug_register_msgs(ctx);
+
+       return ctx;
 }
 
-/**
- * Lock the messaging tdb based on a string - this is used as a primitive
- * form of mutex between smbd instances. 
- *
- * @param name A string identifying the name of the mutex.
+/*
+ * re-init after a fork
  */
-
-BOOL message_named_mutex(char *name, unsigned int timeout)
+NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
 {
-       TDB_DATA key;
-       int ret;
+#ifdef CLUSTER_SUPPORT
 
-       if (!message_init())
-               return False;
+       TALLOC_FREE(msg_ctx->remote);
+
+       if (lp_clustering()) {
+               NTSTATUS status;
 
-       key.dptr = name;
-       key.dsize = strlen(name)+1;
+               status = messaging_ctdbd_init(msg_ctx, msg_ctx,
+                                             &msg_ctx->remote);
 
-       if (timeout) {
-               gotalarm = 0;
-               CatchSignal(SIGALRM, SIGNAL_CAST gotalarm_sig);
-               alarm(timeout);
+               if (!NT_STATUS_IS_OK(status)) {
+                       DEBUG(0, ("messaging_ctdb_init failed: %s\n",
+                                 nt_errstr(status)));
+                       return status;
+               }
        }
 
-       ret = tdb_chainlock(tdb, key);
+#endif
 
-       if (timeout) {
-               alarm(0);
-               CatchSignal(SIGALRM, SIGNAL_CAST SIG_IGN);
-               if (gotalarm)
-                       return False;
+       return NT_STATUS_OK;
+}
+
+
+/*
+ * Register a dispatch function for a particular message type. Allow multiple
+ * registrants
+*/
+NTSTATUS messaging_register(struct messaging_context *msg_ctx,
+                           void *private_data,
+                           uint32_t msg_type,
+                           void (*fn)(struct messaging_context *msg,
+                                      void *private_data, 
+                                      uint32_t msg_type, 
+                                      struct server_id server_id,
+                                      DATA_BLOB *data))
+{
+       struct messaging_callback *cb;
+
+       /*
+        * Only one callback per type
+        */
+
+       for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
+               if (cb->msg_type == msg_type) {
+                       cb->fn = fn;
+                       cb->private_data = private_data;
+                       return NT_STATUS_OK;
+               }
        }
 
-       if (ret == 0)
-               DEBUG(10,("message_named_mutex: got mutex for %s\n", name ));
+       if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
+               return NT_STATUS_NO_MEMORY;
+       }
+
+       cb->msg_type = msg_type;
+       cb->fn = fn;
+       cb->private_data = private_data;
 
-       return (ret == 0);
+       DLIST_ADD(msg_ctx->callbacks, cb);
+       return NT_STATUS_OK;
 }
 
-/**
- * Unlock a named mutex.
- *
- * @param name A string identifying the name of the mutex.
- */
+/*
+  De-register the function for a particular message type.
+*/
+void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
+                         void *private_data)
+{
+       struct messaging_callback *cb, *next;
+
+       for (cb = ctx->callbacks; cb; cb = next) {
+               next = cb->next;
+               if ((cb->msg_type == msg_type)
+                   && (cb->private_data == private_data)) {
+                       DLIST_REMOVE(ctx->callbacks, cb);
+                       TALLOC_FREE(cb);
+               }
+       }
+}
 
-void message_named_mutex_release(char *name)
+/*
+  Send a message to a particular server
+*/
+NTSTATUS messaging_send(struct messaging_context *msg_ctx,
+                       struct server_id server, uint32_t msg_type,
+                       const DATA_BLOB *data)
 {
-       TDB_DATA key;
+#ifdef CLUSTER_SUPPORT
+       if (!procid_is_local(&server)) {
+               return msg_ctx->remote->send_fn(msg_ctx, server,
+                                               msg_type, data,
+                                               msg_ctx->remote);
+       }
+#endif
+       return msg_ctx->local->send_fn(msg_ctx, server, msg_type, data,
+                                      msg_ctx->local);
+}
 
-       key.dptr = name;
-       key.dsize = strlen(name)+1;
+NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
+                           struct server_id server, uint32_t msg_type,
+                           const uint8 *buf, size_t len)
+{
+       DATA_BLOB blob = data_blob_const(buf, len);
+       return messaging_send(msg_ctx, server, msg_type, &blob);
+}
 
-       tdb_chainunlock(tdb, key);
-       DEBUG(10,("message_named_mutex: released mutex for %s\n", name ));
+/*
+  Dispatch one messsaging_rec
+*/
+void messaging_dispatch_rec(struct messaging_context *msg_ctx,
+                           struct messaging_rec *rec)
+{
+       struct messaging_callback *cb, *next;
+
+       for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
+               next = cb->next;
+               if (cb->msg_type == rec->msg_type) {
+                       cb->fn(msg_ctx, cb->private_data, rec->msg_type,
+                              rec->src, &rec->buf);
+                       return;
+               }
+       }
+       return;
 }
 
 /** @} **/