Changes from APPLIANCE_HEAD:
[ira/wip.git] / source3 / lib / messages.c
index 1cc6700ea9b265de70f57cbae2508c968dacb0b4..94c04cfa8e67c5152b3da19afef4e19dc4e2b9a0 100644 (file)
@@ -71,7 +71,7 @@ a useful function for testing the message system
 void ping_message(int msg_type, pid_t src, void *buf, size_t len)
 {
        DEBUG(1,("INFO: Received PING message from PID %d\n",src));
-       message_send_pid(src, MSG_PONG, buf, len);
+       message_send_pid(src, MSG_PONG, buf, len, True);
 }
 
 /****************************************************************************
@@ -83,7 +83,7 @@ void debuglevel_message(int msg_type, pid_t src, void *buf, size_t len)
        
        DEBUG(1,("INFO: Received REQ_DEBUGLEVEL message from PID %d\n",src));
         level = DEBUGLEVEL;
-       message_send_pid(src, MSG_DEBUGLEVEL, &level, sizeof(int));
+       message_send_pid(src, MSG_DEBUGLEVEL, &level, sizeof(int), True);
 }
 
 /****************************************************************************
@@ -120,9 +120,9 @@ static TDB_DATA message_key_pid(pid_t pid)
        TDB_DATA kbuf;
 
        slprintf(key, sizeof(key), "PID/%d", (int)pid);
-
+       
        kbuf.dptr = (char *)key;
-       kbuf.dsize = sizeof(key);
+       kbuf.dsize = strlen(key)+1;
        return kbuf;
 }
 
@@ -148,13 +148,24 @@ static BOOL message_notify(pid_t pid)
 /****************************************************************************
 send a message to a particular pid
 ****************************************************************************/
-BOOL message_send_pid(pid_t pid, int msg_type, void *buf, size_t len)
+BOOL message_send_pid(pid_t pid, int msg_type, void *buf, size_t len, BOOL duplicates_allowed)
 {
        TDB_DATA kbuf;
        TDB_DATA dbuf;
        struct message_rec rec;
        void *p;
 
+       /*
+        * Do an early check for process exists - saves adding into a tdb
+        * and deleting again if the target is not present. JRA.
+        */
+
+       if (!process_exists(pid)) {
+               DEBUG(2,("message_send_pid: pid %d doesn't exist\n", (int)pid));
+               tdb_delete(tdb, message_key_pid(pid));
+               return False;
+       }
+
        rec.msg_version = MESSAGE_VERSION;
        rec.msg_type = msg_type;
        rec.dest = pid;
@@ -164,7 +175,7 @@ BOOL message_send_pid(pid_t pid, int msg_type, void *buf, size_t len)
        kbuf = message_key_pid(pid);
 
        /* lock the record for the destination */
-       tdb_lockchain(tdb, kbuf);
+       tdb_chainlock(tdb, kbuf);
 
        dbuf = tdb_fetch(tdb, kbuf);
 
@@ -183,6 +194,30 @@ BOOL message_send_pid(pid_t pid, int msg_type, void *buf, size_t len)
                goto ok;
        }
 
+       if (!duplicates_allowed) {
+               char *ptr;
+               struct message_rec prec;
+               
+               for(ptr = (char *)dbuf.dptr; ptr < dbuf.dptr + dbuf.dsize; ) {
+                       /*
+                        * First check if the message header matches, then, if it's a non-zero
+                        * sized message, check if the data matches. If so it's a duplicate and
+                        * we can discard it. JRA.
+                        */
+
+                       if (!memcmp(ptr, &rec, sizeof(rec))) {
+                               if (!len || (len && !memcmp( ptr + sizeof(rec), (char *)buf, len))) {
+                                       DEBUG(10,("message_send_pid: discarding duplicate message.\n"));
+                                       free(dbuf.dptr);
+                                       tdb_chainunlock(tdb, kbuf);
+                                       return True;
+                               }
+                       }
+                       memcpy(&prec, ptr, sizeof(prec));
+                       ptr += sizeof(rec) + prec.len;
+               }
+       }
+
        /* we're adding to an existing entry */
        p = (void *)malloc(dbuf.dsize + len + sizeof(rec));
        if (!p) goto failed;
@@ -198,11 +233,11 @@ BOOL message_send_pid(pid_t pid, int msg_type, void *buf, size_t len)
        free(dbuf.dptr);
 
  ok:
-       tdb_unlockchain(tdb, kbuf);
+       tdb_chainunlock(tdb, kbuf);
        return message_notify(pid);
 
  failed:
-       tdb_unlockchain(tdb, kbuf);
+       tdb_chainunlock(tdb, kbuf);
        return False;
 }
 
@@ -219,7 +254,7 @@ static BOOL message_recv(int *msg_type, pid_t *src, void **buf, size_t *len)
 
        kbuf = message_key_pid(sys_getpid());
 
-       tdb_lockchain(tdb, kbuf);
+       tdb_chainlock(tdb, kbuf);
        
        dbuf = tdb_fetch(tdb, kbuf);
        if (dbuf.dptr == NULL || dbuf.dsize == 0) goto failed;
@@ -244,16 +279,21 @@ static BOOL message_recv(int *msg_type, pid_t *src, void **buf, size_t *len)
        *msg_type = rec.msg_type;
        *src = rec.src;
 
-       memmove(dbuf.dptr, dbuf.dptr+sizeof(rec)+rec.len, dbuf.dsize - (sizeof(rec)+rec.len));
+       if (dbuf.dsize - (sizeof(rec)+rec.len) > 0)
+               memmove(dbuf.dptr, dbuf.dptr+sizeof(rec)+rec.len, dbuf.dsize - (sizeof(rec)+rec.len));
        dbuf.dsize -= sizeof(rec)+rec.len;
-       tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
+
+       if (dbuf.dsize == 0)
+               tdb_delete(tdb, kbuf);
+       else
+               tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
 
        free(dbuf.dptr);
-       tdb_unlockchain(tdb, kbuf);
+       tdb_chainunlock(tdb, kbuf);
        return True;
 
  failed:
-       tdb_unlockchain(tdb, kbuf);
+       tdb_chainunlock(tdb, kbuf);
        return False;
 }
 
@@ -323,6 +363,7 @@ static struct {
        int msg_type;
        void *buf;
        size_t len;
+       BOOL duplicates;
 } msg_all;
 
 /****************************************************************************
@@ -334,7 +375,8 @@ static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void
 
        memcpy(&crec, dbuf.dptr, sizeof(crec));
 
-       message_send_pid(crec.pid, msg_all.msg_type, msg_all.buf, msg_all.len);
+       if (crec.cnum != -1) return 0;
+       message_send_pid(crec.pid, msg_all.msg_type, msg_all.buf, msg_all.len, msg_all.duplicates);
        return 0;
 }
 
@@ -343,21 +385,13 @@ this is a useful function for sending messages to all smbd processes.
 It isn't very efficient, but should be OK for the sorts of applications that 
 use it. When we need efficient broadcast we can add it.
 ****************************************************************************/
-BOOL message_send_all(int msg_type, void *buf, size_t len)
+BOOL message_send_all(TDB_CONTEXT *conn_tdb, int msg_type, void *buf, size_t len, BOOL duplicates_allowed)
 {
-       TDB_CONTEXT *the_tdb;
-
-       the_tdb = tdb_open(lock_path("connections.tdb"), 0, 0, O_RDONLY, 0);
-       if (!the_tdb) {
-               DEBUG(2,("Failed to open connections database in message_send_all\n"));
-               return False;
-       }
-
        msg_all.msg_type = msg_type;
        msg_all.buf = buf;
        msg_all.len = len;
+       msg_all.duplicates = duplicates_allowed;
 
-       tdb_traverse(the_tdb, traverse_fn, NULL);
-       tdb_close(the_tdb);
+       tdb_traverse(conn_tdb, traverse_fn, NULL);
        return True;
 }