r20165: Change messaging subsystem to only allow one message
[sfrench/samba-autobuild/.git] / source / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 2 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27   
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49
50 /* the locking database handle */
51 static TDB_CONTEXT *tdb;
52 static int received_signal;
53
54 /* change the message version with any incompatible changes in the protocol */
55 #define MESSAGE_VERSION 1
56
57 struct message_rec {
58         int msg_version;
59         int msg_type;
60         struct process_id dest;
61         struct process_id src;
62         size_t len;
63 };
64
65 /* we have a linked list of dispatch handlers */
66 static struct dispatch_fns {
67         struct dispatch_fns *next, *prev;
68         int msg_type;
69         void (*fn)(int msg_type, struct process_id pid, void *buf, size_t len);
70 } *dispatch_fns;
71
72 /****************************************************************************
73  Free global objects.
74 ****************************************************************************/
75
76 void gfree_messages(void)
77 {
78         struct dispatch_fns *dfn, *next;
79
80         /* delete the dispatch_fns list */
81         dfn = dispatch_fns;
82         while( dfn ) {
83                 next = dfn->next;
84                 DLIST_REMOVE(dispatch_fns, dfn);
85                 SAFE_FREE(dfn);
86                 dfn = next;
87         }
88 }
89
90 /****************************************************************************
91  Notifications come in as signals.
92 ****************************************************************************/
93
94 static void sig_usr1(void)
95 {
96         received_signal = 1;
97         sys_select_signal(SIGUSR1);
98 }
99
100 /****************************************************************************
101  A useful function for testing the message system.
102 ****************************************************************************/
103
104 static void ping_message(int msg_type, struct process_id src,
105                          void *buf, size_t len)
106 {
107         const char *msg = buf ? (const char *)buf : "none";
108
109         DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
110                  procid_str_static(&src), msg));
111         message_send_pid(src, MSG_PONG, buf, len, True);
112 }
113
114 /****************************************************************************
115  Initialise the messaging functions. 
116 ****************************************************************************/
117
118 BOOL message_init(void)
119 {
120         sec_init();
121
122         if (tdb)
123                 return True;
124
125         tdb = tdb_open_log(lock_path("messages.tdb"), 
126                        0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, 
127                        O_RDWR|O_CREAT,0600);
128
129         if (!tdb) {
130                 DEBUG(0,("ERROR: Failed to initialise messages database\n"));
131                 return False;
132         }
133
134         CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
135
136         message_register(MSG_PING, ping_message);
137
138         /* Register some debugging related messages */
139
140         register_msg_pool_usage();
141         register_dmalloc_msgs();
142
143         return True;
144 }
145
146 /*******************************************************************
147  Form a static tdb key from a pid.
148 ******************************************************************/
149
150 static TDB_DATA message_key_pid(struct process_id pid)
151 {
152         static char key[20];
153         TDB_DATA kbuf;
154
155         slprintf(key, sizeof(key)-1, "PID/%s", procid_str_static(&pid));
156         
157         kbuf.dptr = (char *)key;
158         kbuf.dsize = strlen(key)+1;
159         return kbuf;
160 }
161
162 /****************************************************************************
163  Notify a process that it has a message. If the process doesn't exist 
164  then delete its record in the database.
165 ****************************************************************************/
166
167 static BOOL message_notify(struct process_id procid)
168 {
169         pid_t pid = procid.pid;
170         int ret;
171         uid_t euid = geteuid();
172
173         /*
174          * Doing kill with a non-positive pid causes messages to be
175          * sent to places we don't want.
176          */
177
178         SMB_ASSERT(pid > 0);
179
180         if (euid != 0) {
181                 become_root_uid_only();
182         }
183
184         ret = kill(pid, SIGUSR1);
185
186         if (euid != 0) {
187                 unbecome_root_uid_only();
188         }
189
190         if (ret == -1) {
191                 if (errno == ESRCH) {
192                         DEBUG(2,("pid %d doesn't exist - deleting messages record\n", (int)pid));
193                         tdb_delete(tdb, message_key_pid(procid));
194                 } else {
195                         DEBUG(2,("message to process %d failed - %s\n", (int)pid, strerror(errno)));
196                 }
197                 return False;
198         }
199
200         return True;
201 }
202
203 /****************************************************************************
204  Send a message to a particular pid.
205 ****************************************************************************/
206
207 static BOOL message_send_pid_internal(struct process_id pid, int msg_type,
208                                       const void *buf, size_t len,
209                                       BOOL duplicates_allowed,
210                                       unsigned int timeout)
211 {
212         TDB_DATA kbuf;
213         TDB_DATA dbuf;
214         TDB_DATA old_dbuf;
215         struct message_rec rec;
216         char *ptr;
217         struct message_rec prec;
218
219         /* NULL pointer means implicit length zero. */
220         if (!buf) {
221                 SMB_ASSERT(len == 0);
222         }
223
224         /*
225          * Doing kill with a non-positive pid causes messages to be
226          * sent to places we don't want.
227          */
228
229         SMB_ASSERT(procid_to_pid(&pid) > 0);
230
231         rec.msg_version = MESSAGE_VERSION;
232         rec.msg_type = msg_type;
233         rec.dest = pid;
234         rec.src = procid_self();
235         rec.len = buf ? len : 0;
236
237         kbuf = message_key_pid(pid);
238
239         dbuf.dptr = (char *)SMB_MALLOC(len + sizeof(rec));
240         if (!dbuf.dptr)
241                 return False;
242
243         memcpy(dbuf.dptr, &rec, sizeof(rec));
244         if (len > 0 && buf)
245                 memcpy((void *)((char*)dbuf.dptr+sizeof(rec)), buf, len);
246
247         dbuf.dsize = len + sizeof(rec);
248
249         if (duplicates_allowed) {
250
251                 /* If duplicates are allowed we can just append the message and return. */
252
253                 /* lock the record for the destination */
254                 if (timeout) {
255                         if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) {
256                                 DEBUG(0,("message_send_pid_internal: failed to get chainlock with timeout %ul.\n", timeout));
257                                 return False;
258                         }
259                 } else {
260                         if (tdb_chainlock(tdb, kbuf) == -1) {
261                                 DEBUG(0,("message_send_pid_internal: failed to get chainlock.\n"));
262                                 return False;
263                         }
264                 }       
265                 tdb_append(tdb, kbuf, dbuf);
266                 tdb_chainunlock(tdb, kbuf);
267
268                 SAFE_FREE(dbuf.dptr);
269                 errno = 0;                    /* paranoia */
270                 return message_notify(pid);
271         }
272
273         /* lock the record for the destination */
274         if (timeout) {
275                 if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) {
276                         DEBUG(0,("message_send_pid_internal: failed to get chainlock with timeout %ul.\n", timeout));
277                         return False;
278                 }
279         } else {
280                 if (tdb_chainlock(tdb, kbuf) == -1) {
281                         DEBUG(0,("message_send_pid_internal: failed to get chainlock.\n"));
282                         return False;
283                 }
284         }       
285
286         old_dbuf = tdb_fetch(tdb, kbuf);
287
288         if (!old_dbuf.dptr) {
289                 /* its a new record */
290
291                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
292                 tdb_chainunlock(tdb, kbuf);
293
294                 SAFE_FREE(dbuf.dptr);
295                 errno = 0;                    /* paranoia */
296                 return message_notify(pid);
297         }
298
299         /* Not a new record. Check for duplicates. */
300
301         for(ptr = (char *)old_dbuf.dptr; ptr < old_dbuf.dptr + old_dbuf.dsize; ) {
302                 /*
303                  * First check if the message header matches, then, if it's a non-zero
304                  * sized message, check if the data matches. If so it's a duplicate and
305                  * we can discard it. JRA.
306                  */
307
308                 if (!memcmp(ptr, &rec, sizeof(rec))) {
309                         if (!len || (len && !memcmp( ptr + sizeof(rec), buf, len))) {
310                                 tdb_chainunlock(tdb, kbuf);
311                                 DEBUG(10,("message_send_pid_internal: discarding duplicate message.\n"));
312                                 SAFE_FREE(dbuf.dptr);
313                                 SAFE_FREE(old_dbuf.dptr);
314                                 return True;
315                         }
316                 }
317                 memcpy(&prec, ptr, sizeof(prec));
318                 ptr += sizeof(rec) + prec.len;
319         }
320
321         /* we're adding to an existing entry */
322
323         tdb_append(tdb, kbuf, dbuf);
324         tdb_chainunlock(tdb, kbuf);
325
326         SAFE_FREE(old_dbuf.dptr);
327         SAFE_FREE(dbuf.dptr);
328
329         errno = 0;                    /* paranoia */
330         return message_notify(pid);
331 }
332
333 /****************************************************************************
334  Send a message to a particular pid - no timeout.
335 ****************************************************************************/
336
337 BOOL message_send_pid(struct process_id pid, int msg_type, const void *buf, size_t len, BOOL duplicates_allowed)
338 {
339         return message_send_pid_internal(pid, msg_type, buf, len, duplicates_allowed, 0);
340 }
341
342 /****************************************************************************
343  Send a message to a particular pid, with timeout in seconds.
344 ****************************************************************************/
345
346 BOOL message_send_pid_with_timeout(struct process_id pid, int msg_type, const void *buf, size_t len,
347                 BOOL duplicates_allowed, unsigned int timeout)
348 {
349         return message_send_pid_internal(pid, msg_type, buf, len, duplicates_allowed, timeout);
350 }
351
352 /****************************************************************************
353  Count the messages pending for a particular pid. Expensive....
354 ****************************************************************************/
355
356 unsigned int messages_pending_for_pid(struct process_id pid)
357 {
358         TDB_DATA kbuf;
359         TDB_DATA dbuf;
360         char *buf;
361         unsigned int message_count = 0;
362
363         kbuf = message_key_pid(pid);
364
365         dbuf = tdb_fetch(tdb, kbuf);
366         if (dbuf.dptr == NULL || dbuf.dsize == 0) {
367                 SAFE_FREE(dbuf.dptr);
368                 return 0;
369         }
370
371         for (buf = dbuf.dptr; dbuf.dsize > sizeof(struct message_rec);) {
372                 struct message_rec rec;
373                 memcpy(&rec, buf, sizeof(rec));
374                 buf += (sizeof(rec) + rec.len);
375                 dbuf.dsize -= (sizeof(rec) + rec.len);
376                 message_count++;
377         }
378
379         SAFE_FREE(dbuf.dptr);
380         return message_count;
381 }
382
383 /****************************************************************************
384  Retrieve all messages for the current process.
385 ****************************************************************************/
386
387 static BOOL retrieve_all_messages(char **msgs_buf, size_t *total_len)
388 {
389         TDB_DATA kbuf;
390         TDB_DATA dbuf;
391         TDB_DATA null_dbuf;
392
393         ZERO_STRUCT(null_dbuf);
394
395         *msgs_buf = NULL;
396         *total_len = 0;
397
398         kbuf = message_key_pid(pid_to_procid(sys_getpid()));
399
400         if (tdb_chainlock(tdb, kbuf) == -1)
401                 return False;
402
403         dbuf = tdb_fetch(tdb, kbuf);
404         /*
405          * Replace with an empty record to keep the allocated
406          * space in the tdb.
407          */
408         tdb_store(tdb, kbuf, null_dbuf, TDB_REPLACE);
409         tdb_chainunlock(tdb, kbuf);
410
411         if (dbuf.dptr == NULL || dbuf.dsize == 0) {
412                 SAFE_FREE(dbuf.dptr);
413                 return False;
414         }
415
416         *msgs_buf = dbuf.dptr;
417         *total_len = dbuf.dsize;
418
419         return True;
420 }
421
422 /****************************************************************************
423  Parse out the next message for the current process.
424 ****************************************************************************/
425
426 static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type,
427                          struct process_id *src, char **buf, size_t *len)
428 {
429         struct message_rec rec;
430         char *ret_buf = *buf;
431
432         *buf = NULL;
433         *len = 0;
434
435         if (total_len - (ret_buf - msgs_buf) < sizeof(rec))
436                 return False;
437
438         memcpy(&rec, ret_buf, sizeof(rec));
439         ret_buf += sizeof(rec);
440
441         if (rec.msg_version != MESSAGE_VERSION) {
442                 DEBUG(0,("message version %d received (expected %d)\n", rec.msg_version, MESSAGE_VERSION));
443                 return False;
444         }
445
446         if (rec.len > 0) {
447                 if (total_len - (ret_buf - msgs_buf) < rec.len)
448                         return False;
449         }
450
451         *len = rec.len;
452         *msg_type = rec.msg_type;
453         *src = rec.src;
454         *buf = ret_buf;
455
456         return True;
457 }
458
459 /****************************************************************************
460  Receive and dispatch any messages pending for this process.
461  JRA changed Dec 13 2006. Only one message handler now permitted per type.
462  *NOTE*: Dispatch functions must be able to cope with incoming
463  messages on an *odd* byte boundary.
464 ****************************************************************************/
465
466 void message_dispatch(void)
467 {
468         int msg_type;
469         struct process_id src;
470         char *buf;
471         char *msgs_buf;
472         size_t len, total_len;
473         int n_handled;
474
475         if (!received_signal)
476                 return;
477
478         DEBUG(10,("message_dispatch: received_signal = %d\n", received_signal));
479
480         received_signal = 0;
481
482         if (!retrieve_all_messages(&msgs_buf, &total_len))
483                 return;
484
485         for (buf = msgs_buf; message_recv(msgs_buf, total_len, &msg_type, &src, &buf, &len); buf += len) {
486                 struct dispatch_fns *dfn, *next;
487
488                 DEBUG(10,("message_dispatch: received msg_type=%d "
489                           "src_pid=%u\n", msg_type,
490                           (unsigned int) procid_to_pid(&src)));
491
492                 n_handled = 0;
493                 for (dfn = dispatch_fns; dfn; dfn = next) {
494                         next = dfn->next;                       
495                         if (dfn->msg_type == msg_type) {
496                                 DEBUG(10,("message_dispatch: processing message of type %d.\n", msg_type));
497                                 dfn->fn(msg_type, src, len ? (void *)buf : NULL, len);
498                                 n_handled++;
499                         }
500                 }
501                 if (!n_handled) {
502                         DEBUG(5,("message_dispatch: warning: no handlers registed for "
503                                  "msg_type %d in pid %u\n",
504                                  msg_type, (unsigned int)sys_getpid()));
505                 }
506         }
507         SAFE_FREE(msgs_buf);
508 }
509
510 /****************************************************************************
511  Register/replace a dispatch function for a particular message type.
512  JRA changed Dec 13 2006. Only one message handler now permitted per type.
513  *NOTE*: Dispatch functions must be able to cope with incoming
514  messages on an *odd* byte boundary.
515 ****************************************************************************/
516
517 void message_register(int msg_type, 
518                       void (*fn)(int msg_type, struct process_id pid,
519                                  void *buf, size_t len))
520 {
521         struct dispatch_fns *dfn;
522
523         for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
524                 if (dfn->msg_type == msg_type) {
525                         dfn->fn = fn;
526                         return;
527                 }
528         }
529
530         dfn = SMB_MALLOC_P(struct dispatch_fns);
531
532         if (dfn != NULL) {
533
534                 ZERO_STRUCTPN(dfn);
535
536                 dfn->msg_type = msg_type;
537                 dfn->fn = fn;
538
539                 DLIST_ADD(dispatch_fns, dfn);
540         }
541         else {
542         
543                 DEBUG(0,("message_register: Not enough memory. malloc failed!\n"));
544         }
545 }
546
547 /****************************************************************************
548  De-register the function for a particular message type.
549 ****************************************************************************/
550
551 void message_deregister(int msg_type)
552 {
553         struct dispatch_fns *dfn, *next;
554
555         for (dfn = dispatch_fns; dfn; dfn = next) {
556                 next = dfn->next;
557                 if (dfn->msg_type == msg_type) {
558                         DLIST_REMOVE(dispatch_fns, dfn);
559                         SAFE_FREE(dfn);
560                         return;
561                 }
562         }       
563 }
564
565 struct msg_all {
566         int msg_type;
567         uint32 msg_flag;
568         const void *buf;
569         size_t len;
570         BOOL duplicates;
571         int n_sent;
572 };
573
574 /****************************************************************************
575  Send one of the messages for the broadcast.
576 ****************************************************************************/
577
578 static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void *state)
579 {
580         struct connections_data crec;
581         struct msg_all *msg_all = (struct msg_all *)state;
582
583         if (dbuf.dsize != sizeof(crec))
584                 return 0;
585
586         memcpy(&crec, dbuf.dptr, sizeof(crec));
587
588         if (crec.cnum != -1)
589                 return 0;
590
591         /* Don't send if the receiver hasn't registered an interest. */
592
593         if(!(crec.bcast_msg_flags & msg_all->msg_flag))
594                 return 0;
595
596         /* If the msg send fails because the pid was not found (i.e. smbd died), 
597          * the msg has already been deleted from the messages.tdb.*/
598
599         if (!message_send_pid(crec.pid, msg_all->msg_type,
600                               msg_all->buf, msg_all->len,
601                               msg_all->duplicates)) {
602                 
603                 /* If the pid was not found delete the entry from connections.tdb */
604
605                 if (errno == ESRCH) {
606                         DEBUG(2,("pid %s doesn't exist - deleting connections %d [%s]\n",
607                                  procid_str_static(&crec.pid),
608                                  crec.cnum, crec.name));
609                         tdb_delete(the_tdb, kbuf);
610                 }
611         }
612         msg_all->n_sent++;
613         return 0;
614 }
615
616 /**
617  * Send a message to all smbd processes.
618  *
619  * It isn't very efficient, but should be OK for the sorts of
620  * applications that use it. When we need efficient broadcast we can add
621  * it.
622  *
623  * @param n_sent Set to the number of messages sent.  This should be
624  * equal to the number of processes, but be careful for races.
625  *
626  * @retval True for success.
627  **/
628 BOOL message_send_all(TDB_CONTEXT *conn_tdb, int msg_type,
629                       const void *buf, size_t len,
630                       BOOL duplicates_allowed,
631                       int *n_sent)
632 {
633         struct msg_all msg_all;
634
635         msg_all.msg_type = msg_type;
636         if (msg_type < 1000)
637                 msg_all.msg_flag = FLAG_MSG_GENERAL;
638         else if (msg_type > 1000 && msg_type < 2000)
639                 msg_all.msg_flag = FLAG_MSG_NMBD;
640         else if (msg_type > 2000 && msg_type < 2100)
641                 msg_all.msg_flag = FLAG_MSG_PRINT_NOTIFY;
642         else if (msg_type > 2100 && msg_type < 3000)
643                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
644         else if (msg_type > 3000 && msg_type < 4000)
645                 msg_all.msg_flag = FLAG_MSG_SMBD;
646         else
647                 return False;
648
649         msg_all.buf = buf;
650         msg_all.len = len;
651         msg_all.duplicates = duplicates_allowed;
652         msg_all.n_sent = 0;
653
654         tdb_traverse(conn_tdb, traverse_fn, &msg_all);
655         if (n_sent)
656                 *n_sent = msg_all.n_sent;
657         return True;
658 }
659
660 /*
661  * Block and unblock receiving of messages. Allows removal of race conditions
662  * when doing a fork and changing message disposition.
663  */
664
665 void message_block(void)
666 {
667         BlockSignals(True, SIGUSR1);
668 }
669
670 void message_unblock(void)
671 {
672         BlockSignals(False, SIGUSR1);
673 }
674 /** @} **/