void ping_message(int msg_type, pid_t src, void *buf, size_t len)
{
DEBUG(1,("INFO: Received PING message from PID %d\n",src));
- message_send_pid(src, MSG_PONG, buf, len);
+ message_send_pid(src, MSG_PONG, buf, len, True);
}
/****************************************************************************
DEBUG(1,("INFO: Received REQ_DEBUGLEVEL message from PID %d\n",src));
level = DEBUGLEVEL;
- message_send_pid(src, MSG_DEBUGLEVEL, &level, sizeof(int));
+ message_send_pid(src, MSG_DEBUGLEVEL, &level, sizeof(int), True);
}
/****************************************************************************
TDB_DATA kbuf;
slprintf(key, sizeof(key), "PID/%d", (int)pid);
-
+
kbuf.dptr = (char *)key;
- kbuf.dsize = sizeof(key);
+ kbuf.dsize = strlen(key)+1;
return kbuf;
}
/****************************************************************************
send a message to a particular pid
****************************************************************************/
-BOOL message_send_pid(pid_t pid, int msg_type, void *buf, size_t len)
+BOOL message_send_pid(pid_t pid, int msg_type, void *buf, size_t len, BOOL duplicates_allowed)
{
TDB_DATA kbuf;
TDB_DATA dbuf;
struct message_rec rec;
void *p;
+ /*
+ * Do an early check for process exists - saves adding into a tdb
+ * and deleting again if the target is not present. JRA.
+ */
+
+ if (!process_exists(pid)) {
+ DEBUG(2,("message_send_pid: pid %d doesn't exist\n", (int)pid));
+ tdb_delete(tdb, message_key_pid(pid));
+ return False;
+ }
+
rec.msg_version = MESSAGE_VERSION;
rec.msg_type = msg_type;
rec.dest = pid;
kbuf = message_key_pid(pid);
/* lock the record for the destination */
- tdb_lockchain(tdb, kbuf);
+ tdb_chainlock(tdb, kbuf);
dbuf = tdb_fetch(tdb, kbuf);
goto ok;
}
+ if (!duplicates_allowed) {
+ char *ptr;
+ struct message_rec prec;
+
+ for(ptr = (char *)dbuf.dptr; ptr < dbuf.dptr + dbuf.dsize; ) {
+ /*
+ * First check if the message header matches, then, if it's a non-zero
+ * sized message, check if the data matches. If so it's a duplicate and
+ * we can discard it. JRA.
+ */
+
+ if (!memcmp(ptr, &rec, sizeof(rec))) {
+ if (!len || (len && !memcmp( ptr + sizeof(rec), (char *)buf, len))) {
+ DEBUG(10,("message_send_pid: discarding duplicate message.\n"));
+ free(dbuf.dptr);
+ tdb_chainunlock(tdb, kbuf);
+ return True;
+ }
+ }
+ memcpy(&prec, ptr, sizeof(prec));
+ ptr += sizeof(rec) + prec.len;
+ }
+ }
+
/* we're adding to an existing entry */
p = (void *)malloc(dbuf.dsize + len + sizeof(rec));
if (!p) goto failed;
free(dbuf.dptr);
ok:
- tdb_unlockchain(tdb, kbuf);
+ tdb_chainunlock(tdb, kbuf);
return message_notify(pid);
failed:
- tdb_unlockchain(tdb, kbuf);
+ tdb_chainunlock(tdb, kbuf);
return False;
}
kbuf = message_key_pid(sys_getpid());
- tdb_lockchain(tdb, kbuf);
+ tdb_chainlock(tdb, kbuf);
dbuf = tdb_fetch(tdb, kbuf);
if (dbuf.dptr == NULL || dbuf.dsize == 0) goto failed;
*msg_type = rec.msg_type;
*src = rec.src;
- memmove(dbuf.dptr, dbuf.dptr+sizeof(rec)+rec.len, dbuf.dsize - (sizeof(rec)+rec.len));
+ if (dbuf.dsize - (sizeof(rec)+rec.len) > 0)
+ memmove(dbuf.dptr, dbuf.dptr+sizeof(rec)+rec.len, dbuf.dsize - (sizeof(rec)+rec.len));
dbuf.dsize -= sizeof(rec)+rec.len;
- tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
+
+ if (dbuf.dsize == 0)
+ tdb_delete(tdb, kbuf);
+ else
+ tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
free(dbuf.dptr);
- tdb_unlockchain(tdb, kbuf);
+ tdb_chainunlock(tdb, kbuf);
return True;
failed:
- tdb_unlockchain(tdb, kbuf);
+ tdb_chainunlock(tdb, kbuf);
return False;
}
int msg_type;
void *buf;
size_t len;
+ BOOL duplicates;
} msg_all;
/****************************************************************************
memcpy(&crec, dbuf.dptr, sizeof(crec));
- message_send_pid(crec.pid, msg_all.msg_type, msg_all.buf, msg_all.len);
+ if (crec.cnum != -1) return 0;
+ message_send_pid(crec.pid, msg_all.msg_type, msg_all.buf, msg_all.len, msg_all.duplicates);
return 0;
}
It isn't very efficient, but should be OK for the sorts of applications that
use it. When we need efficient broadcast we can add it.
****************************************************************************/
-BOOL message_send_all(int msg_type, void *buf, size_t len)
+BOOL message_send_all(TDB_CONTEXT *conn_tdb, int msg_type, void *buf, size_t len, BOOL duplicates_allowed)
{
- TDB_CONTEXT *the_tdb;
-
- the_tdb = tdb_open(lock_path("connections.tdb"), 0, 0, O_RDONLY, 0);
- if (!the_tdb) {
- DEBUG(2,("Failed to open connections database in message_send_all\n"));
- return False;
- }
-
msg_all.msg_type = msg_type;
msg_all.buf = buf;
msg_all.len = len;
+ msg_all.duplicates = duplicates_allowed;
- tdb_traverse(the_tdb, traverse_fn, NULL);
- tdb_close(the_tdb);
+ tdb_traverse(conn_tdb, traverse_fn, NULL);
return True;
}