X-Git-Url: http://git.samba.org/samba.git/?a=blobdiff_plain;f=source3%2Flib%2Fmessages.c;h=058bbc99b0bcce105609c18af40a633096a95ad0;hb=54abd2aa66069e6baf7769c496f46d9dba18db39;hp=36a23e28ab91547bffa2b444ca65b6f7cfbcfbe0;hpb=1bb92383451e5db820c565af3d4176c7a48f4dd4;p=vlendec%2Fsamba-autobuild%2F.git diff --git a/source3/lib/messages.c b/source3/lib/messages.c index 36a23e28ab9..058bbc99b0b 100644 --- a/source3/lib/messages.c +++ b/source3/lib/messages.c @@ -3,6 +3,7 @@ Samba internal messaging functions Copyright (C) Andrew Tridgell 2000 Copyright (C) 2001 by Martin Pool + Copyright (C) 2002 by Jeremy Allison This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -20,11 +21,11 @@ */ /** - @defgroups messages Internal messaging framework - @{ - @file messages.c - - This module is used for internal messaging between Samba daemons. + @defgroup messages Internal messaging framework + @{ + @file messages.c + + @brief Module for internal messaging between Samba daemons. The idea is that if a part of Samba wants to do communication with another Samba process then it will do a message_register() of a @@ -35,6 +36,9 @@ use that to reply by message_send_pid(). See ping_message() for a simple example. + @caution Dispatch functions must be able to cope with incoming + messages on an *odd* byte boundary. + This system doesn't have any inherent size limitations but is not very efficient for large messages or when messages are sent in very quick succession. @@ -53,8 +57,8 @@ static int received_signal; struct message_rec { int msg_version; int msg_type; - pid_t dest; - pid_t src; + struct process_id dest; + struct process_id src; size_t len; }; @@ -62,7 +66,7 @@ struct message_rec { static struct dispatch_fns { struct dispatch_fns *next, *prev; int msg_type; - void (*fn)(int msg_type, pid_t pid, void *buf, size_t len); + void (*fn)(int msg_type, struct process_id pid, void *buf, size_t len); } *dispatch_fns; /**************************************************************************** @@ -72,17 +76,19 @@ static struct dispatch_fns { static void sig_usr1(void) { received_signal = 1; - sys_select_signal(); + sys_select_signal(SIGUSR1); } /**************************************************************************** A useful function for testing the message system. ****************************************************************************/ -static void ping_message(int msg_type, pid_t src, void *buf, size_t len) +static void ping_message(int msg_type, struct process_id src, + void *buf, size_t len) { - char *msg = buf ? buf : "none"; - DEBUG(1,("INFO: Received PING message from PID %u [%s]\n",(unsigned int)src, msg)); + const char *msg = buf ? buf : "none"; + DEBUG(1,("INFO: Received PING message from PID %s [%s]\n", + procid_str_static(&src), msg)); message_send_pid(src, MSG_PONG, buf, len, True); } @@ -107,6 +113,11 @@ BOOL message_init(void) message_register(MSG_PING, ping_message); + /* Register some debugging related messages */ + + register_msg_pool_usage(); + register_dmalloc_msgs(); + return True; } @@ -114,12 +125,12 @@ BOOL message_init(void) Form a static tdb key from a pid. ******************************************************************/ -static TDB_DATA message_key_pid(pid_t pid) +static TDB_DATA message_key_pid(struct process_id pid) { static char key[20]; TDB_DATA kbuf; - slprintf(key, sizeof(key)-1, "PID/%d", (int)pid); + slprintf(key, sizeof(key)-1, "PID/%s", procid_str_static(&pid)); kbuf.dptr = (char *)key; kbuf.dsize = strlen(key)+1; @@ -131,15 +142,20 @@ static TDB_DATA message_key_pid(pid_t pid) then delete its record in the database. ****************************************************************************/ -static BOOL message_notify(pid_t pid) +static BOOL message_notify(struct process_id procid) { - /* Doing kill with a non-positive pid causes messages to be - * sent to places we don't want. */ + pid_t pid = procid.pid; + /* + * Doing kill with a non-positive pid causes messages to be + * sent to places we don't want. + */ + SMB_ASSERT(pid > 0); + if (kill(pid, SIGUSR1) == -1) { if (errno == ESRCH) { DEBUG(2,("pid %d doesn't exist - deleting messages record\n", (int)pid)); - tdb_delete(tdb, message_key_pid(pid)); + tdb_delete(tdb, message_key_pid(procid)); } else { DEBUG(2,("message to process %d failed - %s\n", (int)pid, strerror(errno))); } @@ -152,208 +168,315 @@ static BOOL message_notify(pid_t pid) Send a message to a particular pid. ****************************************************************************/ -BOOL message_send_pid(pid_t pid, int msg_type, const void *buf, size_t len, - BOOL duplicates_allowed) +static BOOL message_send_pid_internal(struct process_id pid, int msg_type, + const void *buf, size_t len, + BOOL duplicates_allowed, + unsigned int timeout) { TDB_DATA kbuf; TDB_DATA dbuf; + TDB_DATA old_dbuf; struct message_rec rec; - void *p; + char *ptr; + struct message_rec prec; + + /* + * Doing kill with a non-positive pid causes messages to be + * sent to places we don't want. + */ + + SMB_ASSERT(procid_to_pid(&pid) > 0); rec.msg_version = MESSAGE_VERSION; rec.msg_type = msg_type; rec.dest = pid; - rec.src = sys_getpid(); + rec.src = procid_self(); rec.len = len; - /* Doing kill with a non-positive pid causes messages to be - * sent to places we don't want. */ - SMB_ASSERT(pid > 0); - kbuf = message_key_pid(pid); + dbuf.dptr = (void *)SMB_MALLOC(len + sizeof(rec)); + if (!dbuf.dptr) + return False; + + memcpy(dbuf.dptr, &rec, sizeof(rec)); + if (len > 0) + memcpy((void *)((char*)dbuf.dptr+sizeof(rec)), buf, len); + + dbuf.dsize = len + sizeof(rec); + + if (duplicates_allowed) { + + /* If duplicates are allowed we can just append the message and return. */ + + /* lock the record for the destination */ + if (timeout) { + if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) { + DEBUG(0,("message_send_pid_internal: failed to get chainlock with timeout %ul.\n", timeout)); + return False; + } + } else { + if (tdb_chainlock(tdb, kbuf) == -1) { + DEBUG(0,("message_send_pid_internal: failed to get chainlock.\n")); + return False; + } + } + tdb_append(tdb, kbuf, dbuf); + tdb_chainunlock(tdb, kbuf); + + SAFE_FREE(dbuf.dptr); + errno = 0; /* paranoia */ + return message_notify(pid); + } + /* lock the record for the destination */ - tdb_chainlock(tdb, kbuf); + if (timeout) { + if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) { + DEBUG(0,("message_send_pid_internal: failed to get chainlock with timeout %ul.\n", timeout)); + return False; + } + } else { + if (tdb_chainlock(tdb, kbuf) == -1) { + DEBUG(0,("message_send_pid_internal: failed to get chainlock.\n")); + return False; + } + } - dbuf = tdb_fetch(tdb, kbuf); + old_dbuf = tdb_fetch(tdb, kbuf); - if (!dbuf.dptr) { + if (!old_dbuf.dptr) { /* its a new record */ - p = (void *)malloc(len + sizeof(rec)); - if (!p) - goto failed; - memcpy(p, &rec, sizeof(rec)); - if (len > 0) - memcpy((void *)((char*)p+sizeof(rec)), buf, len); - - dbuf.dptr = p; - dbuf.dsize = len + sizeof(rec); tdb_store(tdb, kbuf, dbuf, TDB_REPLACE); - SAFE_FREE(p); - goto ok; + tdb_chainunlock(tdb, kbuf); + + SAFE_FREE(dbuf.dptr); + errno = 0; /* paranoia */ + return message_notify(pid); } - if (!duplicates_allowed) { - char *ptr; - struct message_rec prec; - - for(ptr = (char *)dbuf.dptr; ptr < dbuf.dptr + dbuf.dsize; ) { - /* - * First check if the message header matches, then, if it's a non-zero - * sized message, check if the data matches. If so it's a duplicate and - * we can discard it. JRA. - */ - - if (!memcmp(ptr, &rec, sizeof(rec))) { - if (!len || (len && !memcmp( ptr + sizeof(rec), buf, len))) { - DEBUG(10,("message_send_pid: discarding duplicate message.\n")); - SAFE_FREE(dbuf.dptr); - tdb_chainunlock(tdb, kbuf); - return True; - } + /* Not a new record. Check for duplicates. */ + + for(ptr = (char *)old_dbuf.dptr; ptr < old_dbuf.dptr + old_dbuf.dsize; ) { + /* + * First check if the message header matches, then, if it's a non-zero + * sized message, check if the data matches. If so it's a duplicate and + * we can discard it. JRA. + */ + + if (!memcmp(ptr, &rec, sizeof(rec))) { + if (!len || (len && !memcmp( ptr + sizeof(rec), buf, len))) { + tdb_chainunlock(tdb, kbuf); + DEBUG(10,("message_send_pid_internal: discarding duplicate message.\n")); + SAFE_FREE(dbuf.dptr); + SAFE_FREE(old_dbuf.dptr); + return True; } - memcpy(&prec, ptr, sizeof(prec)); - ptr += sizeof(rec) + prec.len; } + memcpy(&prec, ptr, sizeof(prec)); + ptr += sizeof(rec) + prec.len; } /* we're adding to an existing entry */ - p = (void *)malloc(dbuf.dsize + len + sizeof(rec)); - if (!p) - goto failed; - memcpy(p, dbuf.dptr, dbuf.dsize); - memcpy((void *)((char*)p+dbuf.dsize), &rec, sizeof(rec)); - if (len > 0) - memcpy((void *)((char*)p+dbuf.dsize+sizeof(rec)), buf, len); + tdb_append(tdb, kbuf, dbuf); + tdb_chainunlock(tdb, kbuf); - SAFE_FREE(dbuf.dptr); - dbuf.dptr = p; - dbuf.dsize += len + sizeof(rec); - tdb_store(tdb, kbuf, dbuf, TDB_REPLACE); + SAFE_FREE(old_dbuf.dptr); SAFE_FREE(dbuf.dptr); - ok: - tdb_chainunlock(tdb, kbuf); errno = 0; /* paranoia */ return message_notify(pid); +} - failed: - tdb_chainunlock(tdb, kbuf); - errno = 0; /* paranoia */ - return False; +/**************************************************************************** + Send a message to a particular pid - no timeout. +****************************************************************************/ + +BOOL message_send_pid(struct process_id pid, int msg_type, const void *buf, size_t len, BOOL duplicates_allowed) +{ + return message_send_pid_internal(pid, msg_type, buf, len, duplicates_allowed, 0); } /**************************************************************************** - Retrieve the next message for the current process. + Send a message to a particular pid, with timeout in seconds. ****************************************************************************/ -static BOOL message_recv(int *msg_type, pid_t *src, void **buf, size_t *len) +BOOL message_send_pid_with_timeout(struct process_id pid, int msg_type, const void *buf, size_t len, + BOOL duplicates_allowed, unsigned int timeout) +{ + return message_send_pid_internal(pid, msg_type, buf, len, duplicates_allowed, timeout); +} + +/**************************************************************************** + Count the messages pending for a particular pid. Expensive.... +****************************************************************************/ + +unsigned int messages_pending_for_pid(struct process_id pid) { TDB_DATA kbuf; TDB_DATA dbuf; - struct message_rec rec; + char *buf; + unsigned int message_count = 0; - kbuf = message_key_pid(sys_getpid()); + kbuf = message_key_pid(pid); - tdb_chainlock(tdb, kbuf); - dbuf = tdb_fetch(tdb, kbuf); - if (dbuf.dptr == NULL || dbuf.dsize == 0) - goto failed; + if (dbuf.dptr == NULL || dbuf.dsize == 0) { + SAFE_FREE(dbuf.dptr); + return 0; + } + + for (buf = dbuf.dptr; dbuf.dsize > sizeof(struct message_rec);) { + struct message_rec rec; + memcpy(&rec, buf, sizeof(rec)); + buf += (sizeof(rec) + rec.len); + dbuf.dsize -= (sizeof(rec) + rec.len); + message_count++; + } + + SAFE_FREE(dbuf.dptr); + return message_count; +} + +/**************************************************************************** + Retrieve all messages for the current process. +****************************************************************************/ + +static BOOL retrieve_all_messages(char **msgs_buf, size_t *total_len) +{ + TDB_DATA kbuf; + TDB_DATA dbuf; + TDB_DATA null_dbuf; - memcpy(&rec, dbuf.dptr, sizeof(rec)); + ZERO_STRUCT(null_dbuf); + + *msgs_buf = NULL; + *total_len = 0; + + kbuf = message_key_pid(pid_to_procid(sys_getpid())); + + if (tdb_chainlock(tdb, kbuf) == -1) + return False; + + dbuf = tdb_fetch(tdb, kbuf); + /* + * Replace with an empty record to keep the allocated + * space in the tdb. + */ + tdb_store(tdb, kbuf, null_dbuf, TDB_REPLACE); + tdb_chainunlock(tdb, kbuf); + + if (dbuf.dptr == NULL || dbuf.dsize == 0) { + SAFE_FREE(dbuf.dptr); + return False; + } + + *msgs_buf = dbuf.dptr; + *total_len = dbuf.dsize; + + return True; +} + +/**************************************************************************** + Parse out the next message for the current process. +****************************************************************************/ + +static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type, + struct process_id *src, char **buf, size_t *len) +{ + struct message_rec rec; + char *ret_buf = *buf; + + *buf = NULL; + *len = 0; + + if (total_len - (ret_buf - msgs_buf) < sizeof(rec)) + return False; + + memcpy(&rec, ret_buf, sizeof(rec)); + ret_buf += sizeof(rec); if (rec.msg_version != MESSAGE_VERSION) { DEBUG(0,("message version %d received (expected %d)\n", rec.msg_version, MESSAGE_VERSION)); - goto failed; + return False; } if (rec.len > 0) { - (*buf) = (void *)malloc(rec.len); - if (!(*buf)) - goto failed; - - memcpy(*buf, dbuf.dptr+sizeof(rec), rec.len); - } else { - *buf = NULL; + if (total_len - (ret_buf - msgs_buf) < rec.len) + return False; } *len = rec.len; *msg_type = rec.msg_type; *src = rec.src; + *buf = ret_buf; - if (dbuf.dsize - (sizeof(rec)+rec.len) > 0) - memmove(dbuf.dptr, dbuf.dptr+sizeof(rec)+rec.len, dbuf.dsize - (sizeof(rec)+rec.len)); - dbuf.dsize -= sizeof(rec)+rec.len; - - if (dbuf.dsize == 0) - tdb_delete(tdb, kbuf); - else - tdb_store(tdb, kbuf, dbuf, TDB_REPLACE); - - SAFE_FREE(dbuf.dptr); - tdb_chainunlock(tdb, kbuf); return True; - - failed: - tdb_chainunlock(tdb, kbuf); - SAFE_FREE(dbuf.dptr); - return False; } /**************************************************************************** Receive and dispatch any messages pending for this process. Notice that all dispatch handlers for a particular msg_type get called, so you can register multiple handlers for a message. + *NOTE*: Dispatch functions must be able to cope with incoming + messages on an *odd* byte boundary. ****************************************************************************/ void message_dispatch(void) { int msg_type; - pid_t src; - void *buf; - size_t len; + struct process_id src; + char *buf; + char *msgs_buf; + size_t len, total_len; struct dispatch_fns *dfn; int n_handled; - if (!received_signal) return; + if (!received_signal) + return; DEBUG(10,("message_dispatch: received_signal = %d\n", received_signal)); received_signal = 0; - while (message_recv(&msg_type, &src, &buf, &len)) { - DEBUG(10,("message_dispatch: received msg_type=%d src_pid=%d\n", - msg_type, (int) src)); + if (!retrieve_all_messages(&msgs_buf, &total_len)) + return; + + for (buf = msgs_buf; message_recv(msgs_buf, total_len, &msg_type, &src, &buf, &len); buf += len) { + DEBUG(10,("message_dispatch: received msg_type=%d " + "src_pid=%u\n", msg_type, + (unsigned int) procid_to_pid(&src))); n_handled = 0; for (dfn = dispatch_fns; dfn; dfn = dfn->next) { if (dfn->msg_type == msg_type) { DEBUG(10,("message_dispatch: processing message of type %d.\n", msg_type)); - dfn->fn(msg_type, src, buf, len); + dfn->fn(msg_type, src, len ? (void *)buf : NULL, len); n_handled++; } } if (!n_handled) { - DEBUG(5,("message_dispatch: warning: no handlers registered for " - "msg_type %d in pid %d\n", - msg_type, sys_getpid())); + DEBUG(5,("message_dispatch: warning: no handlers registed for " + "msg_type %d in pid %u\n", + msg_type, (unsigned int)sys_getpid())); } - SAFE_FREE(buf); } + SAFE_FREE(msgs_buf); } /**************************************************************************** Register a dispatch function for a particular message type. + *NOTE*: Dispatch functions must be able to cope with incoming + messages on an *odd* byte boundary. ****************************************************************************/ void message_register(int msg_type, - void (*fn)(int msg_type, pid_t pid, void *buf, size_t len)) + void (*fn)(int msg_type, struct process_id pid, + void *buf, size_t len)) { struct dispatch_fns *dfn; - dfn = (struct dispatch_fns *)malloc(sizeof(*dfn)); + dfn = SMB_MALLOC_P(struct dispatch_fns); if (dfn != NULL) { @@ -428,8 +551,9 @@ static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void /* If the pid was not found delete the entry from connections.tdb */ if (errno == ESRCH) { - DEBUG(2,("pid %u doesn't exist - deleting connections %d [%s]\n", - (unsigned int)crec.pid, crec.cnum, crec.name)); + DEBUG(2,("pid %s doesn't exist - deleting connections %d [%s]\n", + procid_str_static(&crec.pid), + crec.cnum, crec.name)); tdb_delete(the_tdb, kbuf); } } @@ -447,7 +571,7 @@ static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void * @param n_sent Set to the number of messages sent. This should be * equal to the number of processes, but be careful for races. * - * @return True for success. + * @retval True for success. **/ BOOL message_send_all(TDB_CONTEXT *conn_tdb, int msg_type, const void *buf, size_t len, @@ -461,8 +585,10 @@ BOOL message_send_all(TDB_CONTEXT *conn_tdb, int msg_type, msg_all.msg_flag = FLAG_MSG_GENERAL; else if (msg_type > 1000 && msg_type < 2000) msg_all.msg_flag = FLAG_MSG_NMBD; - else if (msg_type > 2000 && msg_type < 3000) - msg_all.msg_flag = FLAG_MSG_PRINTING; + else if (msg_type > 2000 && msg_type < 2100) + msg_all.msg_flag = FLAG_MSG_PRINT_NOTIFY; + else if (msg_type > 2100 && msg_type < 3000) + msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL; else if (msg_type > 3000 && msg_type < 4000) msg_all.msg_flag = FLAG_MSG_SMBD; else