sync'ing up for 3.0alpha20 release
[vlendec/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 2 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, write to the Free Software
19    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 */
21
22 /**
23    @defgroups messages Internal messaging framework
24    @{
25    @file messages.c
26
27    This module is used for internal messaging between Samba daemons. 
28
29    The idea is that if a part of Samba wants to do communication with
30    another Samba process then it will do a message_register() of a
31    dispatch function, and use message_send_pid() to send messages to
32    that process.
33
34    The dispatch function is given the pid of the sender, and it can
35    use that to reply by message_send_pid().  See ping_message() for a
36    simple example.
37
38    This system doesn't have any inherent size limitations but is not
39    very efficient for large messages or when messages are sent in very
40    quick succession.
41
42 */
43
44 #include "includes.h"
45
46 /* the locking database handle */
47 static TDB_CONTEXT *tdb;
48 static int received_signal;
49
50 /* change the message version with any incompatible changes in the protocol */
51 #define MESSAGE_VERSION 1
52
53 struct message_rec {
54         int msg_version;
55         int msg_type;
56         pid_t dest;
57         pid_t src;
58         size_t len;
59 };
60
61 /* we have a linked list of dispatch handlers */
62 static struct dispatch_fns {
63         struct dispatch_fns *next, *prev;
64         int msg_type;
65         void (*fn)(int msg_type, pid_t pid, void *buf, size_t len);
66 } *dispatch_fns;
67
68 /****************************************************************************
69  Notifications come in as signals.
70 ****************************************************************************/
71
72 static void sig_usr1(void)
73 {
74         received_signal = 1;
75         sys_select_signal();
76 }
77
78 /****************************************************************************
79  A useful function for testing the message system.
80 ****************************************************************************/
81
82 static void ping_message(int msg_type, pid_t src, void *buf, size_t len)
83 {
84         char *msg = buf ? buf : "none";
85         DEBUG(1,("INFO: Received PING message from PID %u [%s]\n",(unsigned int)src, msg));
86         message_send_pid(src, MSG_PONG, buf, len, True);
87 }
88
89 /****************************************************************************
90  Initialise the messaging functions. 
91 ****************************************************************************/
92
93 BOOL message_init(void)
94 {
95         if (tdb) return True;
96
97         tdb = tdb_open_log(lock_path("messages.tdb"), 
98                        0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, 
99                        O_RDWR|O_CREAT,0600);
100
101         if (!tdb) {
102                 DEBUG(0,("ERROR: Failed to initialise messages database\n"));
103                 return False;
104         }
105
106         CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
107
108         message_register(MSG_PING, ping_message);
109
110         return True;
111 }
112
113 /*******************************************************************
114  Form a static tdb key from a pid.
115 ******************************************************************/
116
117 static TDB_DATA message_key_pid(pid_t pid)
118 {
119         static char key[20];
120         TDB_DATA kbuf;
121
122         slprintf(key, sizeof(key)-1, "PID/%d", (int)pid);
123         
124         kbuf.dptr = (char *)key;
125         kbuf.dsize = strlen(key)+1;
126         return kbuf;
127 }
128
129 /****************************************************************************
130  Notify a process that it has a message. If the process doesn't exist 
131  then delete its record in the database.
132 ****************************************************************************/
133
134 static BOOL message_notify(pid_t pid)
135 {
136         /* Doing kill with a non-positive pid causes messages to be
137          * sent to places we don't want. */
138         SMB_ASSERT(pid > 0);
139         if (kill(pid, SIGUSR1) == -1) {
140                 if (errno == ESRCH) {
141                         DEBUG(2,("pid %d doesn't exist - deleting messages record\n", (int)pid));
142                         tdb_delete(tdb, message_key_pid(pid));
143                 } else {
144                         DEBUG(2,("message to process %d failed - %s\n", (int)pid, strerror(errno)));
145                 }
146                 return False;
147         }
148         return True;
149 }
150
151 /****************************************************************************
152  Send a message to a particular pid.
153 ****************************************************************************/
154
155 BOOL message_send_pid(pid_t pid, int msg_type, const void *buf, size_t len,
156                       BOOL duplicates_allowed)
157 {
158         TDB_DATA kbuf;
159         TDB_DATA dbuf;
160         struct message_rec rec;
161         void *p;
162
163         rec.msg_version = MESSAGE_VERSION;
164         rec.msg_type = msg_type;
165         rec.dest = pid;
166         rec.src = sys_getpid();
167         rec.len = len;
168
169         /* Doing kill with a non-positive pid causes messages to be
170          * sent to places we don't want. */
171         SMB_ASSERT(pid > 0);
172
173         kbuf = message_key_pid(pid);
174
175         /* lock the record for the destination */
176         tdb_chainlock(tdb, kbuf);
177
178         dbuf = tdb_fetch(tdb, kbuf);
179
180         if (!dbuf.dptr) {
181                 /* its a new record */
182                 p = (void *)malloc(len + sizeof(rec));
183                 if (!p) goto failed;
184
185                 memcpy(p, &rec, sizeof(rec));
186                 if (len > 0) memcpy((void *)((char*)p+sizeof(rec)), buf, len);
187
188                 dbuf.dptr = p;
189                 dbuf.dsize = len + sizeof(rec);
190                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
191                 SAFE_FREE(p);
192                 goto ok;
193         }
194
195         if (!duplicates_allowed) {
196                 char *ptr;
197                 struct message_rec prec;
198                 
199                 for(ptr = (char *)dbuf.dptr; ptr < dbuf.dptr + dbuf.dsize; ) {
200                         /*
201                          * First check if the message header matches, then, if it's a non-zero
202                          * sized message, check if the data matches. If so it's a duplicate and
203                          * we can discard it. JRA.
204                          */
205
206                         if (!memcmp(ptr, &rec, sizeof(rec))) {
207                                 if (!len || (len && !memcmp( ptr + sizeof(rec), buf, len))) {
208                                         DEBUG(10,("message_send_pid: discarding duplicate message.\n"));
209                                         SAFE_FREE(dbuf.dptr);
210                                         tdb_chainunlock(tdb, kbuf);
211                                         return True;
212                                 }
213                         }
214                         memcpy(&prec, ptr, sizeof(prec));
215                         ptr += sizeof(rec) + prec.len;
216                 }
217         }
218
219         /* we're adding to an existing entry */
220         p = (void *)malloc(dbuf.dsize + len + sizeof(rec));
221         if (!p) goto failed;
222
223         memcpy(p, dbuf.dptr, dbuf.dsize);
224         memcpy((void *)((char*)p+dbuf.dsize), &rec, sizeof(rec));
225         if (len > 0) memcpy((void *)((char*)p+dbuf.dsize+sizeof(rec)), buf, len);
226
227         SAFE_FREE(dbuf.dptr);
228         dbuf.dptr = p;
229         dbuf.dsize += len + sizeof(rec);
230         tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
231         SAFE_FREE(dbuf.dptr);
232
233  ok:
234         tdb_chainunlock(tdb, kbuf);
235         errno = 0;                    /* paranoia */
236         return message_notify(pid);
237
238  failed:
239         tdb_chainunlock(tdb, kbuf);
240         errno = 0;                    /* paranoia */
241         return False;
242 }
243
244 /****************************************************************************
245  Retrieve the next message for the current process.
246 ****************************************************************************/
247
248 static BOOL message_recv(int *msg_type, pid_t *src, void **buf, size_t *len)
249 {
250         TDB_DATA kbuf;
251         TDB_DATA dbuf;
252         struct message_rec rec;
253
254         kbuf = message_key_pid(sys_getpid());
255
256         tdb_chainlock(tdb, kbuf);
257         
258         dbuf = tdb_fetch(tdb, kbuf);
259         if (dbuf.dptr == NULL || dbuf.dsize == 0) goto failed;
260
261         memcpy(&rec, dbuf.dptr, sizeof(rec));
262
263         if (rec.msg_version != MESSAGE_VERSION) {
264                 DEBUG(0,("message version %d received (expected %d)\n", rec.msg_version, MESSAGE_VERSION));
265                 goto failed;
266         }
267
268         if (rec.len > 0) {
269                 (*buf) = (void *)malloc(rec.len);
270                 if (!(*buf)) goto failed;
271
272                 memcpy(*buf, dbuf.dptr+sizeof(rec), rec.len);
273         } else {
274                 *buf = NULL;
275         }
276
277         *len = rec.len;
278         *msg_type = rec.msg_type;
279         *src = rec.src;
280
281         if (dbuf.dsize - (sizeof(rec)+rec.len) > 0)
282                 memmove(dbuf.dptr, dbuf.dptr+sizeof(rec)+rec.len, dbuf.dsize - (sizeof(rec)+rec.len));
283         dbuf.dsize -= sizeof(rec)+rec.len;
284
285         if (dbuf.dsize == 0)
286                 tdb_delete(tdb, kbuf);
287         else
288                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
289
290         SAFE_FREE(dbuf.dptr);
291         tdb_chainunlock(tdb, kbuf);
292         return True;
293
294  failed:
295         tdb_chainunlock(tdb, kbuf);
296         return False;
297 }
298
299 /****************************************************************************
300  Receive and dispatch any messages pending for this process.
301  Notice that all dispatch handlers for a particular msg_type get called,
302  so you can register multiple handlers for a message.
303 ****************************************************************************/
304
305 void message_dispatch(void)
306 {
307         int msg_type;
308         pid_t src;
309         void *buf;
310         size_t len;
311         struct dispatch_fns *dfn;
312         int n_handled;
313
314         if (!received_signal) return;
315
316         DEBUG(10,("message_dispatch: received_signal = %d\n", received_signal));
317
318         received_signal = 0;
319
320         while (message_recv(&msg_type, &src, &buf, &len)) {
321                 DEBUG(10,("message_dispatch: received msg_type=%d src_pid=%d\n",
322                           msg_type, (int) src));
323                 n_handled = 0;
324                 for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
325                         if (dfn->msg_type == msg_type) {
326                                 DEBUG(10,("message_dispatch: processing message of type %d.\n", msg_type));
327                                 dfn->fn(msg_type, src, buf, len);
328                                 n_handled++;
329                         }
330                 }
331                 if (!n_handled) {
332                         DEBUG(5,("message_dispatch: warning: no handlers registered for "
333                                  "msg_type %d in pid %d\n",
334                                  msg_type, sys_getpid()));
335                 }
336                 SAFE_FREE(buf);
337         }
338 }
339
340 /****************************************************************************
341  Register a dispatch function for a particular message type.
342 ****************************************************************************/
343
344 void message_register(int msg_type, 
345                       void (*fn)(int msg_type, pid_t pid, void *buf, size_t len))
346 {
347         struct dispatch_fns *dfn;
348
349         dfn = (struct dispatch_fns *)malloc(sizeof(*dfn));
350
351         if (dfn != NULL) {
352
353                 ZERO_STRUCTPN(dfn);
354
355                 dfn->msg_type = msg_type;
356                 dfn->fn = fn;
357
358                 DLIST_ADD(dispatch_fns, dfn);
359         }
360         else {
361         
362                 DEBUG(0,("message_register: Not enough memory. malloc failed!\n"));
363         }
364 }
365
366 /****************************************************************************
367  De-register the function for a particular message type.
368 ****************************************************************************/
369
370 void message_deregister(int msg_type)
371 {
372         struct dispatch_fns *dfn, *next;
373
374         for (dfn = dispatch_fns; dfn; dfn = next) {
375                 next = dfn->next;
376                 if (dfn->msg_type == msg_type) {
377                         DLIST_REMOVE(dispatch_fns, dfn);
378                         SAFE_FREE(dfn);
379                 }
380         }       
381 }
382
383 struct msg_all {
384         int msg_type;
385         uint32 msg_flag;
386         const void *buf;
387         size_t len;
388         BOOL duplicates;
389         int n_sent;
390 };
391
392 /****************************************************************************
393  Send one of the messages for the broadcast.
394 ****************************************************************************/
395
396 static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void *state)
397 {
398         struct connections_data crec;
399         struct msg_all *msg_all = (struct msg_all *)state;
400
401         if (dbuf.dsize != sizeof(crec))
402                 return 0;
403
404         memcpy(&crec, dbuf.dptr, sizeof(crec));
405
406         if (crec.cnum != -1)
407                 return 0;
408
409         /* Don't send if the receiver hasn't registered an interest. */
410
411         if(!(crec.bcast_msg_flags & msg_all->msg_flag))
412                 return 0;
413
414         /* If the msg send fails because the pid was not found (i.e. smbd died), 
415          * the msg has already been deleted from the messages.tdb.*/
416
417         if (!message_send_pid(crec.pid, msg_all->msg_type,
418                               msg_all->buf, msg_all->len,
419                               msg_all->duplicates)) {
420                 
421                 /* If the pid was not found delete the entry from connections.tdb */
422
423                 if (errno == ESRCH) {
424                         DEBUG(2,("pid %u doesn't exist - deleting connections %d [%s]\n",
425                                         (unsigned int)crec.pid, crec.cnum, crec.name));
426                         tdb_delete(the_tdb, kbuf);
427                 }
428         }
429         msg_all->n_sent++;
430         return 0;
431 }
432
433 /**
434  * Send a message to all smbd processes.
435  *
436  * It isn't very efficient, but should be OK for the sorts of
437  * applications that use it. When we need efficient broadcast we can add
438  * it.
439  *
440  * @param n_sent Set to the number of messages sent.  This should be
441  * equal to the number of processes, but be careful for races.
442  *
443  * @return True for success.
444  **/
445 BOOL message_send_all(TDB_CONTEXT *conn_tdb, int msg_type,
446                       const void *buf, size_t len,
447                       BOOL duplicates_allowed,
448                       int *n_sent)
449 {
450         struct msg_all msg_all;
451
452         msg_all.msg_type = msg_type;
453         if (msg_type < 1000)
454                 msg_all.msg_flag = FLAG_MSG_GENERAL;
455         else if (msg_type > 1000 && msg_type < 2000)
456                 msg_all.msg_flag = FLAG_MSG_NMBD;
457         else if (msg_type > 2000 && msg_type < 3000)
458                 msg_all.msg_flag = FLAG_MSG_PRINTING;
459         else if (msg_type > 3000 && msg_type < 4000)
460                 msg_all.msg_flag = FLAG_MSG_SMBD;
461         else
462                 return False;
463
464         msg_all.buf = buf;
465         msg_all.len = len;
466         msg_all.duplicates = duplicates_allowed;
467         msg_all.n_sent = 0;
468
469         tdb_traverse(conn_tdb, traverse_fn, &msg_all);
470         if (n_sent)
471                 *n_sent = msg_all.n_sent;
472         return True;
473 }
474 /** @} **/