Doc how to reply to messages.
[samba.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/Netbios implementation.
3    Version 3.0
4    Samba internal messaging functions
5    Copyright (C) Andrew Tridgell 2000
6    
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 2 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, write to the Free Software
19    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 */
21
22 /**
23    @file messages.c
24
25    This module is used for internal messaging between Samba daemons. 
26
27    The idea is that if a part of Samba wants to do communication with
28    another Samba process then it will do a message_register() of a
29    dispatch function, and use message_send_pid() to send messages to
30    that process.
31
32    The dispatch function is given the pid of the sender, and it can
33    use that to reply by message_send_pid().  See ping_message() for a
34    simple example.
35
36    This system doesn't have any inherent size limitations but is not
37    very efficient for large messages or when messages are sent in very
38    quick succession.
39
40 */
41
42 #include "includes.h"
43
44 /* the locking database handle */
45 static TDB_CONTEXT *tdb;
46 static int received_signal;
47
48 /* change the message version with any incompatible changes in the protocol */
49 #define MESSAGE_VERSION 1
50
51 struct message_rec {
52         int msg_version;
53         int msg_type;
54         pid_t dest;
55         pid_t src;
56         size_t len;
57 };
58
59 /* we have a linked list of dispatch handlers */
60 static struct dispatch_fns {
61         struct dispatch_fns *next, *prev;
62         int msg_type;
63         void (*fn)(int msg_type, pid_t pid, void *buf, size_t len);
64 } *dispatch_fns;
65
66 /****************************************************************************
67  Notifications come in as signals.
68 ****************************************************************************/
69
70 static void sig_usr1(void)
71 {
72         received_signal = 1;
73         sys_select_signal();
74 }
75
76 /****************************************************************************
77  A useful function for testing the message system.
78 ****************************************************************************/
79
80 void ping_message(int msg_type, pid_t src, void *buf, size_t len)
81 {
82         char *msg = buf ? buf : "none";
83         DEBUG(1,("INFO: Received PING message from PID %u [%s]\n",(unsigned int)src, msg));
84         message_send_pid(src, MSG_PONG, buf, len, True);
85 }
86
87 /****************************************************************************
88  Return current debug level.
89 ****************************************************************************/
90
91 void debuglevel_message(int msg_type, pid_t src, void *buf, size_t len)
92 {
93         DEBUG(1,("INFO: Received REQ_DEBUGLEVEL message from PID %u\n",(unsigned int)src));
94         message_send_pid(src, MSG_DEBUGLEVEL, DEBUGLEVEL_CLASS, sizeof(DEBUGLEVEL_CLASS), True);
95 }
96
97 /****************************************************************************
98  Initialise the messaging functions. 
99 ****************************************************************************/
100
101 BOOL message_init(void)
102 {
103         if (tdb) return True;
104
105         tdb = tdb_open_log(lock_path("messages.tdb"), 
106                        0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, 
107                        O_RDWR|O_CREAT,0600);
108
109         if (!tdb) {
110                 DEBUG(0,("ERROR: Failed to initialise messages database\n"));
111                 return False;
112         }
113
114         CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
115
116         message_register(MSG_PING, ping_message);
117         message_register(MSG_REQ_DEBUGLEVEL, debuglevel_message);
118
119         return True;
120 }
121
122 /*******************************************************************
123  Form a static tdb key from a pid.
124 ******************************************************************/
125
126 static TDB_DATA message_key_pid(pid_t pid)
127 {
128         static char key[20];
129         TDB_DATA kbuf;
130
131         slprintf(key, sizeof(key)-1, "PID/%d", (int)pid);
132         
133         kbuf.dptr = (char *)key;
134         kbuf.dsize = strlen(key)+1;
135         return kbuf;
136 }
137
138 /****************************************************************************
139  Notify a process that it has a message. If the process doesn't exist 
140  then delete its record in the database.
141 ****************************************************************************/
142
143 static BOOL message_notify(pid_t pid)
144 {
145         if (kill(pid, SIGUSR1) == -1) {
146                 if (errno == ESRCH) {
147                         DEBUG(2,("pid %d doesn't exist - deleting messages record\n", (int)pid));
148                         tdb_delete(tdb, message_key_pid(pid));
149                 } else {
150                         DEBUG(2,("message to process %d failed - %s\n", (int)pid, strerror(errno)));
151                 }
152                 return False;
153         }
154         return True;
155 }
156
157 /****************************************************************************
158  Send a message to a particular pid.
159 ****************************************************************************/
160
161 BOOL message_send_pid(pid_t pid, int msg_type, void *buf, size_t len, BOOL duplicates_allowed)
162 {
163         TDB_DATA kbuf;
164         TDB_DATA dbuf;
165         struct message_rec rec;
166         void *p;
167
168         rec.msg_version = MESSAGE_VERSION;
169         rec.msg_type = msg_type;
170         rec.dest = pid;
171         rec.src = sys_getpid();
172         rec.len = len;
173
174         kbuf = message_key_pid(pid);
175
176         /* lock the record for the destination */
177         tdb_chainlock(tdb, kbuf);
178
179         dbuf = tdb_fetch(tdb, kbuf);
180
181         if (!dbuf.dptr) {
182                 /* its a new record */
183                 p = (void *)malloc(len + sizeof(rec));
184                 if (!p) goto failed;
185
186                 memcpy(p, &rec, sizeof(rec));
187                 if (len > 0) memcpy((void *)((char*)p+sizeof(rec)), buf, len);
188
189                 dbuf.dptr = p;
190                 dbuf.dsize = len + sizeof(rec);
191                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
192                 SAFE_FREE(p);
193                 goto ok;
194         }
195
196         if (!duplicates_allowed) {
197                 char *ptr;
198                 struct message_rec prec;
199                 
200                 for(ptr = (char *)dbuf.dptr; ptr < dbuf.dptr + dbuf.dsize; ) {
201                         /*
202                          * First check if the message header matches, then, if it's a non-zero
203                          * sized message, check if the data matches. If so it's a duplicate and
204                          * we can discard it. JRA.
205                          */
206
207                         if (!memcmp(ptr, &rec, sizeof(rec))) {
208                                 if (!len || (len && !memcmp( ptr + sizeof(rec), (char *)buf, len))) {
209                                         DEBUG(10,("message_send_pid: discarding duplicate message.\n"));
210                                         SAFE_FREE(dbuf.dptr);
211                                         tdb_chainunlock(tdb, kbuf);
212                                         return True;
213                                 }
214                         }
215                         memcpy(&prec, ptr, sizeof(prec));
216                         ptr += sizeof(rec) + prec.len;
217                 }
218         }
219
220         /* we're adding to an existing entry */
221         p = (void *)malloc(dbuf.dsize + len + sizeof(rec));
222         if (!p) goto failed;
223
224         memcpy(p, dbuf.dptr, dbuf.dsize);
225         memcpy((void *)((char*)p+dbuf.dsize), &rec, sizeof(rec));
226         if (len > 0) memcpy((void *)((char*)p+dbuf.dsize+sizeof(rec)), buf, len);
227
228         SAFE_FREE(dbuf.dptr);
229         dbuf.dptr = p;
230         dbuf.dsize += len + sizeof(rec);
231         tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
232         SAFE_FREE(dbuf.dptr);
233
234  ok:
235         tdb_chainunlock(tdb, kbuf);
236         errno = 0;                    /* paranoia */
237         return message_notify(pid);
238
239  failed:
240         tdb_chainunlock(tdb, kbuf);
241         errno = 0;                    /* paranoia */
242         return False;
243 }
244
245 /****************************************************************************
246  Retrieve the next message for the current process.
247 ****************************************************************************/
248
249 static BOOL message_recv(int *msg_type, pid_t *src, void **buf, size_t *len)
250 {
251         TDB_DATA kbuf;
252         TDB_DATA dbuf;
253         struct message_rec rec;
254
255         kbuf = message_key_pid(sys_getpid());
256
257         tdb_chainlock(tdb, kbuf);
258         
259         dbuf = tdb_fetch(tdb, kbuf);
260         if (dbuf.dptr == NULL || dbuf.dsize == 0) goto failed;
261
262         memcpy(&rec, dbuf.dptr, sizeof(rec));
263
264         if (rec.msg_version != MESSAGE_VERSION) {
265                 DEBUG(0,("message version %d received (expected %d)\n", rec.msg_version, MESSAGE_VERSION));
266                 goto failed;
267         }
268
269         if (rec.len > 0) {
270                 (*buf) = (void *)malloc(rec.len);
271                 if (!(*buf)) goto failed;
272
273                 memcpy(*buf, dbuf.dptr+sizeof(rec), rec.len);
274         } else {
275                 *buf = NULL;
276         }
277
278         *len = rec.len;
279         *msg_type = rec.msg_type;
280         *src = rec.src;
281
282         if (dbuf.dsize - (sizeof(rec)+rec.len) > 0)
283                 memmove(dbuf.dptr, dbuf.dptr+sizeof(rec)+rec.len, dbuf.dsize - (sizeof(rec)+rec.len));
284         dbuf.dsize -= sizeof(rec)+rec.len;
285
286         if (dbuf.dsize == 0)
287                 tdb_delete(tdb, kbuf);
288         else
289                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
290
291         SAFE_FREE(dbuf.dptr);
292         tdb_chainunlock(tdb, kbuf);
293         return True;
294
295  failed:
296         tdb_chainunlock(tdb, kbuf);
297         return False;
298 }
299
300 /****************************************************************************
301  Receive and dispatch any messages pending for this process.
302  Notice that all dispatch handlers for a particular msg_type get called,
303  so you can register multiple handlers for a message.
304 ****************************************************************************/
305
306 void message_dispatch(void)
307 {
308         int msg_type;
309         pid_t src;
310         void *buf;
311         size_t len;
312         struct dispatch_fns *dfn;
313
314         if (!received_signal) return;
315
316         DEBUG(10,("message_dispatch: received_signal = %d\n", received_signal));
317
318         received_signal = 0;
319
320         while (message_recv(&msg_type, &src, &buf, &len)) {
321                 for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
322                         if (dfn->msg_type == msg_type) {
323                                 DEBUG(10,("message_dispatch: processing message of type %d.\n", msg_type));
324                                 dfn->fn(msg_type, src, buf, len);
325                         }
326                 }
327                 SAFE_FREE(buf);
328         }
329 }
330
331 /****************************************************************************
332  Register a dispatch function for a particular message type.
333 ****************************************************************************/
334
335 void message_register(int msg_type, 
336                       void (*fn)(int msg_type, pid_t pid, void *buf, size_t len))
337 {
338         struct dispatch_fns *dfn;
339
340         dfn = (struct dispatch_fns *)malloc(sizeof(*dfn));
341
342         if (dfn != NULL) {
343
344                 ZERO_STRUCTPN(dfn);
345
346                 dfn->msg_type = msg_type;
347                 dfn->fn = fn;
348
349                 DLIST_ADD(dispatch_fns, dfn);
350         }
351         else {
352         
353                 DEBUG(0,("message_register: Not enough memory. malloc failed!\n"));
354         }
355 }
356
357 /****************************************************************************
358  De-register the function for a particular message type.
359 ****************************************************************************/
360
361 void message_deregister(int msg_type)
362 {
363         struct dispatch_fns *dfn, *next;
364
365         for (dfn = dispatch_fns; dfn; dfn = next) {
366                 next = dfn->next;
367                 if (dfn->msg_type == msg_type) {
368                         DLIST_REMOVE(dispatch_fns, dfn);
369                         SAFE_FREE(dfn);
370                 }
371         }       
372 }
373
374 struct msg_all {
375         int msg_type;
376         void *buf;
377         size_t len;
378         BOOL duplicates;
379 };
380
381 /****************************************************************************
382  Send one of the messages for the broadcast.
383 ****************************************************************************/
384
385 static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void *state)
386 {
387         struct connections_data crec;
388         struct msg_all *msg_all = (struct msg_all *)state;
389
390         if (dbuf.dsize != sizeof(crec))
391                 return 0;
392
393         memcpy(&crec, dbuf.dptr, sizeof(crec));
394
395         if (crec.cnum != -1)
396                 return 0;
397
398         /* if the msg send fails because the pid was not found (i.e. smbd died), 
399          * the msg has already been deleted from the messages.tdb.*/
400         if (!message_send_pid(crec.pid, msg_all->msg_type, msg_all->buf, msg_all->len,
401                                                         msg_all->duplicates)) {
402                 
403                 /* if the pid was not found delete the entry from connections.tdb */
404                 if (errno == ESRCH) {
405                         DEBUG(2,("pid %u doesn't exist - deleting connections %d [%s]\n",
406                                         (unsigned int)crec.pid, crec.cnum, crec.name));
407                         tdb_delete(the_tdb, kbuf);
408                 }
409         }
410         return 0;
411 }
412
413 /****************************************************************************
414  This is a useful function for sending messages to all smbd processes.
415  It isn't very efficient, but should be OK for the sorts of applications that 
416  use it. When we need efficient broadcast we can add it.
417 ****************************************************************************/
418
419 BOOL message_send_all(TDB_CONTEXT *conn_tdb, int msg_type, void *buf, size_t len, BOOL duplicates_allowed)
420 {
421         struct msg_all msg_all;
422
423         msg_all.msg_type = msg_type;
424         msg_all.buf = buf;
425         msg_all.len = len;
426         msg_all.duplicates = duplicates_allowed;
427
428         tdb_traverse(conn_tdb, traverse_fn, &msg_all);
429         return True;
430 }