Spelling fix.
[sfrench/samba-autobuild/.git] / source / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 2 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, write to the Free Software
19    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 */
21
22 /**
23    @defgroups messages Internal messaging framework
24    @{
25    @file messages.c
26
27    This module is used for internal messaging between Samba daemons. 
28
29    The idea is that if a part of Samba wants to do communication with
30    another Samba process then it will do a message_register() of a
31    dispatch function, and use message_send_pid() to send messages to
32    that process.
33
34    The dispatch function is given the pid of the sender, and it can
35    use that to reply by message_send_pid().  See ping_message() for a
36    simple example.
37
38    This system doesn't have any inherent size limitations but is not
39    very efficient for large messages or when messages are sent in very
40    quick succession.
41
42 */
43
44 #include "includes.h"
45
46 /* the locking database handle */
47 static TDB_CONTEXT *tdb;
48 static int received_signal;
49
50 /* change the message version with any incompatible changes in the protocol */
51 #define MESSAGE_VERSION 1
52
53 struct message_rec {
54         int msg_version;
55         int msg_type;
56         pid_t dest;
57         pid_t src;
58         size_t len;
59 };
60
61 /* we have a linked list of dispatch handlers */
62 static struct dispatch_fns {
63         struct dispatch_fns *next, *prev;
64         int msg_type;
65         void (*fn)(int msg_type, pid_t pid, void *buf, size_t len);
66 } *dispatch_fns;
67
68 /****************************************************************************
69  Notifications come in as signals.
70 ****************************************************************************/
71
72 static void sig_usr1(void)
73 {
74         received_signal = 1;
75         sys_select_signal();
76 }
77
78 /****************************************************************************
79  A useful function for testing the message system.
80 ****************************************************************************/
81
82 static void ping_message(int msg_type, pid_t src, void *buf, size_t len)
83 {
84         char *msg = buf ? buf : "none";
85         DEBUG(1,("INFO: Received PING message from PID %u [%s]\n",(unsigned int)src, msg));
86         message_send_pid(src, MSG_PONG, buf, len, True);
87 }
88
89 /****************************************************************************
90  Initialise the messaging functions. 
91 ****************************************************************************/
92
93 BOOL message_init(void)
94 {
95         if (tdb) return True;
96
97         tdb = tdb_open_log(lock_path("messages.tdb"), 
98                        0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, 
99                        O_RDWR|O_CREAT,0600);
100
101         if (!tdb) {
102                 DEBUG(0,("ERROR: Failed to initialise messages database\n"));
103                 return False;
104         }
105
106         CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
107
108         message_register(MSG_PING, ping_message);
109
110         return True;
111 }
112
113 /*******************************************************************
114  Form a static tdb key from a pid.
115 ******************************************************************/
116
117 static TDB_DATA message_key_pid(pid_t pid)
118 {
119         static char key[20];
120         TDB_DATA kbuf;
121
122         slprintf(key, sizeof(key)-1, "PID/%d", (int)pid);
123         
124         kbuf.dptr = (char *)key;
125         kbuf.dsize = strlen(key)+1;
126         return kbuf;
127 }
128
129 /****************************************************************************
130  Notify a process that it has a message. If the process doesn't exist 
131  then delete its record in the database.
132 ****************************************************************************/
133
134 static BOOL message_notify(pid_t pid)
135 {
136         /* Doing kill with a non-positive pid causes messages to be
137          * sent to places we don't want. */
138         SMB_ASSERT(pid > 0);
139         if (kill(pid, SIGUSR1) == -1) {
140                 if (errno == ESRCH) {
141                         DEBUG(2,("pid %d doesn't exist - deleting messages record\n", (int)pid));
142                         tdb_delete(tdb, message_key_pid(pid));
143                 } else {
144                         DEBUG(2,("message to process %d failed - %s\n", (int)pid, strerror(errno)));
145                 }
146                 return False;
147         }
148         return True;
149 }
150
151 /****************************************************************************
152  Send a message to a particular pid.
153 ****************************************************************************/
154
155 BOOL message_send_pid(pid_t pid, int msg_type, const void *buf, size_t len,
156                       BOOL duplicates_allowed)
157 {
158         TDB_DATA kbuf;
159         TDB_DATA dbuf;
160         struct message_rec rec;
161         void *p;
162
163         rec.msg_version = MESSAGE_VERSION;
164         rec.msg_type = msg_type;
165         rec.dest = pid;
166         rec.src = sys_getpid();
167         rec.len = len;
168
169         /* Doing kill with a non-positive pid causes messages to be
170          * sent to places we don't want. */
171         SMB_ASSERT(pid > 0);
172
173         kbuf = message_key_pid(pid);
174
175         /* lock the record for the destination */
176         tdb_chainlock(tdb, kbuf);
177
178         dbuf = tdb_fetch(tdb, kbuf);
179
180         if (!dbuf.dptr) {
181                 /* its a new record */
182                 p = (void *)malloc(len + sizeof(rec));
183                 if (!p) goto failed;
184
185                 memcpy(p, &rec, sizeof(rec));
186                 if (len > 0) memcpy((void *)((char*)p+sizeof(rec)), buf, len);
187
188                 dbuf.dptr = p;
189                 dbuf.dsize = len + sizeof(rec);
190                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
191                 SAFE_FREE(p);
192                 goto ok;
193         }
194
195         if (!duplicates_allowed) {
196                 char *ptr;
197                 struct message_rec prec;
198                 
199                 for(ptr = (char *)dbuf.dptr; ptr < dbuf.dptr + dbuf.dsize; ) {
200                         /*
201                          * First check if the message header matches, then, if it's a non-zero
202                          * sized message, check if the data matches. If so it's a duplicate and
203                          * we can discard it. JRA.
204                          */
205
206                         if (!memcmp(ptr, &rec, sizeof(rec))) {
207                                 if (!len || (len && !memcmp( ptr + sizeof(rec), buf, len))) {
208                                         DEBUG(10,("message_send_pid: discarding duplicate message.\n"));
209                                         SAFE_FREE(dbuf.dptr);
210                                         tdb_chainunlock(tdb, kbuf);
211                                         return True;
212                                 }
213                         }
214                         memcpy(&prec, ptr, sizeof(prec));
215                         ptr += sizeof(rec) + prec.len;
216                 }
217         }
218
219         /* we're adding to an existing entry */
220         p = (void *)malloc(dbuf.dsize + len + sizeof(rec));
221         if (!p) goto failed;
222
223         memcpy(p, dbuf.dptr, dbuf.dsize);
224         memcpy((void *)((char*)p+dbuf.dsize), &rec, sizeof(rec));
225         if (len > 0) memcpy((void *)((char*)p+dbuf.dsize+sizeof(rec)), buf, len);
226
227         SAFE_FREE(dbuf.dptr);
228         dbuf.dptr = p;
229         dbuf.dsize += len + sizeof(rec);
230         tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
231         SAFE_FREE(dbuf.dptr);
232
233  ok:
234         tdb_chainunlock(tdb, kbuf);
235         errno = 0;                    /* paranoia */
236         return message_notify(pid);
237
238  failed:
239         tdb_chainunlock(tdb, kbuf);
240         errno = 0;                    /* paranoia */
241         return False;
242 }
243
244 /****************************************************************************
245  Retrieve the next message for the current process.
246 ****************************************************************************/
247
248 static BOOL message_recv(int *msg_type, pid_t *src, void **buf, size_t *len)
249 {
250         TDB_DATA kbuf;
251         TDB_DATA dbuf;
252         struct message_rec rec;
253
254         kbuf = message_key_pid(sys_getpid());
255
256         tdb_chainlock(tdb, kbuf);
257         
258         dbuf = tdb_fetch(tdb, kbuf);
259         if (dbuf.dptr == NULL || dbuf.dsize == 0) goto failed;
260
261         memcpy(&rec, dbuf.dptr, sizeof(rec));
262
263         if (rec.msg_version != MESSAGE_VERSION) {
264                 DEBUG(0,("message version %d received (expected %d)\n", rec.msg_version, MESSAGE_VERSION));
265                 goto failed;
266         }
267
268         if (rec.len > 0) {
269                 (*buf) = (void *)malloc(rec.len);
270                 if (!(*buf)) goto failed;
271
272                 memcpy(*buf, dbuf.dptr+sizeof(rec), rec.len);
273         } else {
274                 *buf = NULL;
275         }
276
277         *len = rec.len;
278         *msg_type = rec.msg_type;
279         *src = rec.src;
280
281         if (dbuf.dsize - (sizeof(rec)+rec.len) > 0)
282                 memmove(dbuf.dptr, dbuf.dptr+sizeof(rec)+rec.len, dbuf.dsize - (sizeof(rec)+rec.len));
283         dbuf.dsize -= sizeof(rec)+rec.len;
284
285         if (dbuf.dsize == 0)
286                 tdb_delete(tdb, kbuf);
287         else
288                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
289
290         SAFE_FREE(dbuf.dptr);
291         tdb_chainunlock(tdb, kbuf);
292         return True;
293
294  failed:
295         tdb_chainunlock(tdb, kbuf);
296         return False;
297 }
298
299 /****************************************************************************
300  Receive and dispatch any messages pending for this process.
301  Notice that all dispatch handlers for a particular msg_type get called,
302  so you can register multiple handlers for a message.
303 ****************************************************************************/
304
305 void message_dispatch(void)
306 {
307         int msg_type;
308         pid_t src;
309         void *buf;
310         size_t len;
311         struct dispatch_fns *dfn;
312         int n_handled;
313
314         if (!received_signal) return;
315
316         DEBUG(10,("message_dispatch: received_signal = %d\n", received_signal));
317
318         received_signal = 0;
319
320         while (message_recv(&msg_type, &src, &buf, &len)) {
321                 DEBUG(10,("message_dispatch: received msg_type=%d src_pid=%d\n",
322                           msg_type, (int) src));
323                 n_handled = 0;
324                 for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
325                         if (dfn->msg_type == msg_type) {
326                                 DEBUG(10,("message_dispatch: processing message of type %d.\n", msg_type));
327                                 dfn->fn(msg_type, src, buf, len);
328                                 n_handled++;
329                         }
330                 }
331                 if (!n_handled) {
332                         DEBUG(5,("message_dispatch: warning: no handlers registered for "
333                                  "msg_type %d in pid%d\n",
334                                  msg_type, sys_getpid()));
335                 }
336                 SAFE_FREE(buf);
337         }
338 }
339
340 /****************************************************************************
341  Register a dispatch function for a particular message type.
342 ****************************************************************************/
343
344 void message_register(int msg_type, 
345                       void (*fn)(int msg_type, pid_t pid, void *buf, size_t len))
346 {
347         struct dispatch_fns *dfn;
348
349         dfn = (struct dispatch_fns *)malloc(sizeof(*dfn));
350
351         if (dfn != NULL) {
352
353                 ZERO_STRUCTPN(dfn);
354
355                 dfn->msg_type = msg_type;
356                 dfn->fn = fn;
357
358                 DLIST_ADD(dispatch_fns, dfn);
359         }
360         else {
361         
362                 DEBUG(0,("message_register: Not enough memory. malloc failed!\n"));
363         }
364 }
365
366 /****************************************************************************
367  De-register the function for a particular message type.
368 ****************************************************************************/
369
370 void message_deregister(int msg_type)
371 {
372         struct dispatch_fns *dfn, *next;
373
374         for (dfn = dispatch_fns; dfn; dfn = next) {
375                 next = dfn->next;
376                 if (dfn->msg_type == msg_type) {
377                         DLIST_REMOVE(dispatch_fns, dfn);
378                         SAFE_FREE(dfn);
379                 }
380         }       
381 }
382
383 struct msg_all {
384         int msg_type;
385         const void *buf;
386         size_t len;
387         BOOL duplicates;
388         int             n_sent;
389 };
390
391 /****************************************************************************
392  Send one of the messages for the broadcast.
393 ****************************************************************************/
394
395 static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void *state)
396 {
397         struct connections_data crec;
398         struct msg_all *msg_all = (struct msg_all *)state;
399
400         if (dbuf.dsize != sizeof(crec))
401                 return 0;
402
403         memcpy(&crec, dbuf.dptr, sizeof(crec));
404
405         if (crec.cnum != -1)
406                 return 0;
407
408         /* if the msg send fails because the pid was not found (i.e. smbd died), 
409          * the msg has already been deleted from the messages.tdb.*/
410         if (!message_send_pid(crec.pid, msg_all->msg_type,
411                               msg_all->buf, msg_all->len,
412                               msg_all->duplicates)) {
413                 
414                 /* if the pid was not found delete the entry from connections.tdb */
415                 if (errno == ESRCH) {
416                         DEBUG(2,("pid %u doesn't exist - deleting connections %d [%s]\n",
417                                         (unsigned int)crec.pid, crec.cnum, crec.name));
418                         tdb_delete(the_tdb, kbuf);
419                 }
420         }
421         msg_all->n_sent++;
422         return 0;
423 }
424
425 /**
426  * Send a message to all smbd processes.
427  *
428  * It isn't very efficient, but should be OK for the sorts of
429  * applications that use it. When we need efficient broadcast we can add
430  * it.
431  *
432  * @param n_sent Set to the number of messages sent.  This should be
433  * equal to the number of processes, but be careful for races.
434  *
435  * @return True for success.
436  **/
437 BOOL message_send_all(TDB_CONTEXT *conn_tdb, int msg_type,
438                       const void *buf, size_t len,
439                       BOOL duplicates_allowed,
440                       int *n_sent)
441 {
442         struct msg_all msg_all;
443
444         msg_all.msg_type = msg_type;
445         msg_all.buf = buf;
446         msg_all.len = len;
447         msg_all.duplicates = duplicates_allowed;
448         msg_all.n_sent = 0;
449
450         tdb_traverse(conn_tdb, traverse_fn, &msg_all);
451         if (n_sent)
452                 *n_sent = msg_all.n_sent;
453         return True;
454 }
455
456 static VOLATILE sig_atomic_t gotalarm;
457
458 /***************************************************************
459  Signal function to tell us we timed out.
460 ****************************************************************/
461
462 static void gotalarm_sig(void)
463 {
464         gotalarm = 1;
465 }
466
467 /**
468  * Lock the messaging tdb based on a string - this is used as a primitive
469  * form of mutex between smbd instances. 
470  *
471  * @param name A string identifying the name of the mutex.
472  */
473
474 BOOL message_named_mutex(char *name, unsigned int timeout)
475 {
476         TDB_DATA key;
477         int ret;
478
479         if (!message_init())
480                 return False;
481
482         key.dptr = name;
483         key.dsize = strlen(name)+1;
484
485         if (timeout) {
486                 gotalarm = 0;
487                 CatchSignal(SIGALRM, SIGNAL_CAST gotalarm_sig);
488                 alarm(timeout);
489         }
490
491         ret = tdb_chainlock(tdb, key);
492
493         if (timeout) {
494                 alarm(0);
495                 CatchSignal(SIGALRM, SIGNAL_CAST SIG_IGN);
496                 if (gotalarm)
497                         return False;
498         }
499
500         if (ret == 0)
501                 DEBUG(10,("message_named_mutex: got mutex for %s\n", name ));
502
503         return (ret == 0);
504 }
505
506 /**
507  * Unlock a named mutex.
508  *
509  * @param name A string identifying the name of the mutex.
510  */
511
512 void message_named_mutex_release(char *name)
513 {
514         TDB_DATA key;
515
516         key.dptr = name;
517         key.dsize = strlen(name)+1;
518
519         tdb_chainunlock(tdb, key);
520         DEBUG(10,("message_named_mutex: released mutex for %s\n", name ));
521 }
522
523 /** @} **/