Removed version number from file header.
[kai/samba.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 2 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, write to the Free Software
19    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 */
21
22 /**
23    @defgroups messages Internal messaging framework
24    @{
25    @file messages.c
26
27    This module is used for internal messaging between Samba daemons. 
28
29    The idea is that if a part of Samba wants to do communication with
30    another Samba process then it will do a message_register() of a
31    dispatch function, and use message_send_pid() to send messages to
32    that process.
33
34    The dispatch function is given the pid of the sender, and it can
35    use that to reply by message_send_pid().  See ping_message() for a
36    simple example.
37
38    This system doesn't have any inherent size limitations but is not
39    very efficient for large messages or when messages are sent in very
40    quick succession.
41
42 */
43
44 #include "includes.h"
45
46 /* the locking database handle */
47 static TDB_CONTEXT *tdb;
48 static int received_signal;
49
50 /* change the message version with any incompatible changes in the protocol */
51 #define MESSAGE_VERSION 1
52
53 struct message_rec {
54         int msg_version;
55         int msg_type;
56         pid_t dest;
57         pid_t src;
58         size_t len;
59 };
60
61 /* we have a linked list of dispatch handlers */
62 static struct dispatch_fns {
63         struct dispatch_fns *next, *prev;
64         int msg_type;
65         void (*fn)(int msg_type, pid_t pid, void *buf, size_t len);
66 } *dispatch_fns;
67
68 /****************************************************************************
69  Notifications come in as signals.
70 ****************************************************************************/
71
72 static void sig_usr1(void)
73 {
74         received_signal = 1;
75         sys_select_signal();
76 }
77
78 /****************************************************************************
79  A useful function for testing the message system.
80 ****************************************************************************/
81
82 void ping_message(int msg_type, pid_t src, void *buf, size_t len)
83 {
84         char *msg = buf ? buf : "none";
85         DEBUG(1,("INFO: Received PING message from PID %u [%s]\n",(unsigned int)src, msg));
86         message_send_pid(src, MSG_PONG, buf, len, True);
87 }
88
89 /****************************************************************************
90  Return current debug level.
91 ****************************************************************************/
92
93 void debuglevel_message(int msg_type, pid_t src, void *buf, size_t len)
94 {
95         DEBUG(1,("INFO: Received REQ_DEBUGLEVEL message from PID %u\n",(unsigned int)src));
96         message_send_pid(src, MSG_DEBUGLEVEL, DEBUGLEVEL_CLASS, sizeof(DEBUGLEVEL_CLASS), True);
97 }
98
99 /****************************************************************************
100  Initialise the messaging functions. 
101 ****************************************************************************/
102
103 BOOL message_init(void)
104 {
105         if (tdb) return True;
106
107         tdb = tdb_open_log(lock_path("messages.tdb"), 
108                        0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, 
109                        O_RDWR|O_CREAT,0600);
110
111         if (!tdb) {
112                 DEBUG(0,("ERROR: Failed to initialise messages database\n"));
113                 return False;
114         }
115
116         CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
117
118         message_register(MSG_PING, ping_message);
119         message_register(MSG_REQ_DEBUGLEVEL, debuglevel_message);
120
121         return True;
122 }
123
124 /*******************************************************************
125  Form a static tdb key from a pid.
126 ******************************************************************/
127
128 static TDB_DATA message_key_pid(pid_t pid)
129 {
130         static char key[20];
131         TDB_DATA kbuf;
132
133         slprintf(key, sizeof(key)-1, "PID/%d", (int)pid);
134         
135         kbuf.dptr = (char *)key;
136         kbuf.dsize = strlen(key)+1;
137         return kbuf;
138 }
139
140 /****************************************************************************
141  Notify a process that it has a message. If the process doesn't exist 
142  then delete its record in the database.
143 ****************************************************************************/
144
145 static BOOL message_notify(pid_t pid)
146 {
147         if (kill(pid, SIGUSR1) == -1) {
148                 if (errno == ESRCH) {
149                         DEBUG(2,("pid %d doesn't exist - deleting messages record\n", (int)pid));
150                         tdb_delete(tdb, message_key_pid(pid));
151                 } else {
152                         DEBUG(2,("message to process %d failed - %s\n", (int)pid, strerror(errno)));
153                 }
154                 return False;
155         }
156         return True;
157 }
158
159 /****************************************************************************
160  Send a message to a particular pid.
161 ****************************************************************************/
162
163 BOOL message_send_pid(pid_t pid, int msg_type, const void *buf, size_t len,
164                       BOOL duplicates_allowed)
165 {
166         TDB_DATA kbuf;
167         TDB_DATA dbuf;
168         struct message_rec rec;
169         void *p;
170
171         rec.msg_version = MESSAGE_VERSION;
172         rec.msg_type = msg_type;
173         rec.dest = pid;
174         rec.src = sys_getpid();
175         rec.len = len;
176
177         kbuf = message_key_pid(pid);
178
179         /* lock the record for the destination */
180         tdb_chainlock(tdb, kbuf);
181
182         dbuf = tdb_fetch(tdb, kbuf);
183
184         if (!dbuf.dptr) {
185                 /* its a new record */
186                 p = (void *)malloc(len + sizeof(rec));
187                 if (!p) goto failed;
188
189                 memcpy(p, &rec, sizeof(rec));
190                 if (len > 0) memcpy((void *)((char*)p+sizeof(rec)), buf, len);
191
192                 dbuf.dptr = p;
193                 dbuf.dsize = len + sizeof(rec);
194                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
195                 SAFE_FREE(p);
196                 goto ok;
197         }
198
199         if (!duplicates_allowed) {
200                 char *ptr;
201                 struct message_rec prec;
202                 
203                 for(ptr = (char *)dbuf.dptr; ptr < dbuf.dptr + dbuf.dsize; ) {
204                         /*
205                          * First check if the message header matches, then, if it's a non-zero
206                          * sized message, check if the data matches. If so it's a duplicate and
207                          * we can discard it. JRA.
208                          */
209
210                         if (!memcmp(ptr, &rec, sizeof(rec))) {
211                                 if (!len || (len && !memcmp( ptr + sizeof(rec), buf, len))) {
212                                         DEBUG(10,("message_send_pid: discarding duplicate message.\n"));
213                                         SAFE_FREE(dbuf.dptr);
214                                         tdb_chainunlock(tdb, kbuf);
215                                         return True;
216                                 }
217                         }
218                         memcpy(&prec, ptr, sizeof(prec));
219                         ptr += sizeof(rec) + prec.len;
220                 }
221         }
222
223         /* we're adding to an existing entry */
224         p = (void *)malloc(dbuf.dsize + len + sizeof(rec));
225         if (!p) goto failed;
226
227         memcpy(p, dbuf.dptr, dbuf.dsize);
228         memcpy((void *)((char*)p+dbuf.dsize), &rec, sizeof(rec));
229         if (len > 0) memcpy((void *)((char*)p+dbuf.dsize+sizeof(rec)), buf, len);
230
231         SAFE_FREE(dbuf.dptr);
232         dbuf.dptr = p;
233         dbuf.dsize += len + sizeof(rec);
234         tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
235         SAFE_FREE(dbuf.dptr);
236
237  ok:
238         tdb_chainunlock(tdb, kbuf);
239         errno = 0;                    /* paranoia */
240         return message_notify(pid);
241
242  failed:
243         tdb_chainunlock(tdb, kbuf);
244         errno = 0;                    /* paranoia */
245         return False;
246 }
247
248 /****************************************************************************
249  Retrieve the next message for the current process.
250 ****************************************************************************/
251
252 static BOOL message_recv(int *msg_type, pid_t *src, void **buf, size_t *len)
253 {
254         TDB_DATA kbuf;
255         TDB_DATA dbuf;
256         struct message_rec rec;
257
258         kbuf = message_key_pid(sys_getpid());
259
260         tdb_chainlock(tdb, kbuf);
261         
262         dbuf = tdb_fetch(tdb, kbuf);
263         if (dbuf.dptr == NULL || dbuf.dsize == 0) goto failed;
264
265         memcpy(&rec, dbuf.dptr, sizeof(rec));
266
267         if (rec.msg_version != MESSAGE_VERSION) {
268                 DEBUG(0,("message version %d received (expected %d)\n", rec.msg_version, MESSAGE_VERSION));
269                 goto failed;
270         }
271
272         if (rec.len > 0) {
273                 (*buf) = (void *)malloc(rec.len);
274                 if (!(*buf)) goto failed;
275
276                 memcpy(*buf, dbuf.dptr+sizeof(rec), rec.len);
277         } else {
278                 *buf = NULL;
279         }
280
281         *len = rec.len;
282         *msg_type = rec.msg_type;
283         *src = rec.src;
284
285         if (dbuf.dsize - (sizeof(rec)+rec.len) > 0)
286                 memmove(dbuf.dptr, dbuf.dptr+sizeof(rec)+rec.len, dbuf.dsize - (sizeof(rec)+rec.len));
287         dbuf.dsize -= sizeof(rec)+rec.len;
288
289         if (dbuf.dsize == 0)
290                 tdb_delete(tdb, kbuf);
291         else
292                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
293
294         SAFE_FREE(dbuf.dptr);
295         tdb_chainunlock(tdb, kbuf);
296         return True;
297
298  failed:
299         tdb_chainunlock(tdb, kbuf);
300         return False;
301 }
302
303 /****************************************************************************
304  Receive and dispatch any messages pending for this process.
305  Notice that all dispatch handlers for a particular msg_type get called,
306  so you can register multiple handlers for a message.
307 ****************************************************************************/
308
309 void message_dispatch(void)
310 {
311         int msg_type;
312         pid_t src;
313         void *buf;
314         size_t len;
315         struct dispatch_fns *dfn;
316         int n_handled;
317
318         if (!received_signal) return;
319
320         DEBUG(10,("message_dispatch: received_signal = %d\n", received_signal));
321
322         received_signal = 0;
323
324         while (message_recv(&msg_type, &src, &buf, &len)) {
325                 DEBUG(10,("message_dispatch: received msg_type=%d src_pid=%d\n",
326                           msg_type, (int) src));
327                 n_handled = 0;
328                 for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
329                         if (dfn->msg_type == msg_type) {
330                                 DEBUG(10,("message_dispatch: processing message of type %d.\n", msg_type));
331                                 dfn->fn(msg_type, src, buf, len);
332                                 n_handled++;
333                         }
334                 }
335                 if (!n_handled) {
336                         DEBUG(5,("message_dispatch: warning: no handlers registed for "
337                                  "msg_type %d in pid%d\n",
338                                  msg_type, getpid()));
339                 }
340                 SAFE_FREE(buf);
341         }
342 }
343
344 /****************************************************************************
345  Register a dispatch function for a particular message type.
346 ****************************************************************************/
347
348 void message_register(int msg_type, 
349                       void (*fn)(int msg_type, pid_t pid, void *buf, size_t len))
350 {
351         struct dispatch_fns *dfn;
352
353         dfn = (struct dispatch_fns *)malloc(sizeof(*dfn));
354
355         if (dfn != NULL) {
356
357                 ZERO_STRUCTPN(dfn);
358
359                 dfn->msg_type = msg_type;
360                 dfn->fn = fn;
361
362                 DLIST_ADD(dispatch_fns, dfn);
363         }
364         else {
365         
366                 DEBUG(0,("message_register: Not enough memory. malloc failed!\n"));
367         }
368 }
369
370 /****************************************************************************
371  De-register the function for a particular message type.
372 ****************************************************************************/
373
374 void message_deregister(int msg_type)
375 {
376         struct dispatch_fns *dfn, *next;
377
378         for (dfn = dispatch_fns; dfn; dfn = next) {
379                 next = dfn->next;
380                 if (dfn->msg_type == msg_type) {
381                         DLIST_REMOVE(dispatch_fns, dfn);
382                         SAFE_FREE(dfn);
383                 }
384         }       
385 }
386
387 struct msg_all {
388         int msg_type;
389         const void *buf;
390         size_t len;
391         BOOL duplicates;
392         int             n_sent;
393 };
394
395 /****************************************************************************
396  Send one of the messages for the broadcast.
397 ****************************************************************************/
398
399 static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void *state)
400 {
401         struct connections_data crec;
402         struct msg_all *msg_all = (struct msg_all *)state;
403
404         if (dbuf.dsize != sizeof(crec))
405                 return 0;
406
407         memcpy(&crec, dbuf.dptr, sizeof(crec));
408
409         if (crec.cnum != -1)
410                 return 0;
411
412         /* if the msg send fails because the pid was not found (i.e. smbd died), 
413          * the msg has already been deleted from the messages.tdb.*/
414         if (!message_send_pid(crec.pid, msg_all->msg_type,
415                               msg_all->buf, msg_all->len,
416                               msg_all->duplicates)) {
417                 
418                 /* if the pid was not found delete the entry from connections.tdb */
419                 if (errno == ESRCH) {
420                         DEBUG(2,("pid %u doesn't exist - deleting connections %d [%s]\n",
421                                         (unsigned int)crec.pid, crec.cnum, crec.name));
422                         tdb_delete(the_tdb, kbuf);
423                 }
424         }
425         msg_all->n_sent++;
426         return 0;
427 }
428
429 /**
430  * Send a message to all smbd processes.
431  *
432  * It isn't very efficient, but should be OK for the sorts of
433  * applications that use it. When we need efficient broadcast we can add
434  * it.
435  *
436  * @param n_sent Set to the number of messages sent.  This should be
437  * equal to the number of processes, but be careful for races.
438  *
439  * @return True for success.
440  **/
441 BOOL message_send_all(TDB_CONTEXT *conn_tdb, int msg_type,
442                       const void *buf, size_t len,
443                       BOOL duplicates_allowed,
444                       int *n_sent)
445 {
446         struct msg_all msg_all;
447
448         msg_all.msg_type = msg_type;
449         msg_all.buf = buf;
450         msg_all.len = len;
451         msg_all.duplicates = duplicates_allowed;
452         msg_all.n_sent = 0;
453
454         tdb_traverse(conn_tdb, traverse_fn, &msg_all);
455         if (n_sent)
456                 *n_sent = msg_all.n_sent;
457         return True;
458 }
459
460 /** @} **/