Add 3 second timeout when terminating server and sending print notify
[samba.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 2 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 /**
24    @defgroups messages Internal messaging framework
25    @{
26    @file messages.c
27
28    This module is used for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    *NOTE*: Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49
50 /* the locking database handle */
51 static TDB_CONTEXT *tdb;
52 static int received_signal;
53
54 /* change the message version with any incompatible changes in the protocol */
55 #define MESSAGE_VERSION 1
56
57 struct message_rec {
58         int msg_version;
59         int msg_type;
60         pid_t dest;
61         pid_t src;
62         size_t len;
63 };
64
65 /* we have a linked list of dispatch handlers */
66 static struct dispatch_fns {
67         struct dispatch_fns *next, *prev;
68         int msg_type;
69         void (*fn)(int msg_type, pid_t pid, void *buf, size_t len);
70 } *dispatch_fns;
71
72 /****************************************************************************
73  Notifications come in as signals.
74 ****************************************************************************/
75
76 static void sig_usr1(void)
77 {
78         received_signal = 1;
79         sys_select_signal();
80 }
81
82 /****************************************************************************
83  A useful function for testing the message system.
84 ****************************************************************************/
85
86 static void ping_message(int msg_type, pid_t src, void *buf, size_t len)
87 {
88         const char *msg = buf ? buf : "none";
89         DEBUG(1,("INFO: Received PING message from PID %u [%s]\n",(unsigned int)src, msg));
90         message_send_pid(src, MSG_PONG, buf, len, True);
91 }
92
93 /****************************************************************************
94  Initialise the messaging functions. 
95 ****************************************************************************/
96
97 BOOL message_init(void)
98 {
99         if (tdb) return True;
100
101         tdb = tdb_open_log(lock_path("messages.tdb"), 
102                        0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, 
103                        O_RDWR|O_CREAT,0600);
104
105         if (!tdb) {
106                 DEBUG(0,("ERROR: Failed to initialise messages database\n"));
107                 return False;
108         }
109
110         CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
111
112         message_register(MSG_PING, ping_message);
113
114         return True;
115 }
116
117 /*******************************************************************
118  Form a static tdb key from a pid.
119 ******************************************************************/
120
121 static TDB_DATA message_key_pid(pid_t pid)
122 {
123         static char key[20];
124         TDB_DATA kbuf;
125
126         slprintf(key, sizeof(key)-1, "PID/%d", (int)pid);
127         
128         kbuf.dptr = (char *)key;
129         kbuf.dsize = strlen(key)+1;
130         return kbuf;
131 }
132
133 /****************************************************************************
134  Notify a process that it has a message. If the process doesn't exist 
135  then delete its record in the database.
136 ****************************************************************************/
137
138 static BOOL message_notify(pid_t pid)
139 {
140         /*
141          * Doing kill with a non-positive pid causes messages to be
142          * sent to places we don't want.
143          */
144
145         SMB_ASSERT(pid > 0);
146
147         if (kill(pid, SIGUSR1) == -1) {
148                 if (errno == ESRCH) {
149                         DEBUG(2,("pid %d doesn't exist - deleting messages record\n", (int)pid));
150                         tdb_delete(tdb, message_key_pid(pid));
151                 } else {
152                         DEBUG(2,("message to process %d failed - %s\n", (int)pid, strerror(errno)));
153                 }
154                 return False;
155         }
156         return True;
157 }
158
159 /****************************************************************************
160  Send a message to a particular pid.
161 ****************************************************************************/
162
163 static BOOL message_send_pid_internal(pid_t pid, int msg_type, const void *buf, size_t len,
164                       BOOL duplicates_allowed, unsigned int timeout)
165 {
166         TDB_DATA kbuf;
167         TDB_DATA dbuf;
168         TDB_DATA old_dbuf;
169         struct message_rec rec;
170         char *ptr;
171         struct message_rec prec;
172
173         /*
174          * Doing kill with a non-positive pid causes messages to be
175          * sent to places we don't want.
176          */
177
178         SMB_ASSERT(pid > 0);
179
180         rec.msg_version = MESSAGE_VERSION;
181         rec.msg_type = msg_type;
182         rec.dest = pid;
183         rec.src = sys_getpid();
184         rec.len = len;
185
186         kbuf = message_key_pid(pid);
187
188         dbuf.dptr = (void *)malloc(len + sizeof(rec));
189         if (!dbuf.dptr)
190                 return False;
191
192         memcpy(dbuf.dptr, &rec, sizeof(rec));
193         if (len > 0)
194                 memcpy((void *)((char*)dbuf.dptr+sizeof(rec)), buf, len);
195
196         dbuf.dsize = len + sizeof(rec);
197
198         if (duplicates_allowed) {
199
200                 /* If duplicates are allowed we can just append the message and return. */
201
202                 /* lock the record for the destination */
203                 if (timeout) {
204                         if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) {
205                                 DEBUG(0,("message_send_pid_internal: failed to get chainlock with timeout %ul.\n", timeout));
206                                 return False;
207                         }
208                 } else {
209                         if (tdb_chainlock(tdb, kbuf) == -1) {
210                                 DEBUG(0,("message_send_pid_internal: failed to get chainlock.\n"));
211                                 return False;
212                         }
213                 }       
214                 tdb_append(tdb, kbuf, dbuf);
215                 tdb_chainunlock(tdb, kbuf);
216
217                 SAFE_FREE(dbuf.dptr);
218                 errno = 0;                    /* paranoia */
219                 return message_notify(pid);
220         }
221
222         /* lock the record for the destination */
223         if (timeout) {
224                 if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) {
225                         DEBUG(0,("message_send_pid_internal: failed to get chainlock with timeout %ul.\n", timeout));
226                         return False;
227                 }
228         } else {
229                 if (tdb_chainlock(tdb, kbuf) == -1) {
230                         DEBUG(0,("message_send_pid_internal: failed to get chainlock.\n"));
231                         return False;
232                 }
233         }       
234
235         old_dbuf = tdb_fetch(tdb, kbuf);
236
237         if (!old_dbuf.dptr) {
238                 /* its a new record */
239
240                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
241                 tdb_chainunlock(tdb, kbuf);
242
243                 SAFE_FREE(dbuf.dptr);
244                 errno = 0;                    /* paranoia */
245                 return message_notify(pid);
246         }
247
248         /* Not a new record. Check for duplicates. */
249
250         for(ptr = (char *)old_dbuf.dptr; ptr < old_dbuf.dptr + old_dbuf.dsize; ) {
251                 /*
252                  * First check if the message header matches, then, if it's a non-zero
253                  * sized message, check if the data matches. If so it's a duplicate and
254                  * we can discard it. JRA.
255                  */
256
257                 if (!memcmp(ptr, &rec, sizeof(rec))) {
258                         if (!len || (len && !memcmp( ptr + sizeof(rec), buf, len))) {
259                                 tdb_chainunlock(tdb, kbuf);
260                                 DEBUG(10,("message_send_pid_internal: discarding duplicate message.\n"));
261                                 SAFE_FREE(dbuf.dptr);
262                                 SAFE_FREE(old_dbuf.dptr);
263                                 return True;
264                         }
265                 }
266                 memcpy(&prec, ptr, sizeof(prec));
267                 ptr += sizeof(rec) + prec.len;
268         }
269
270         /* we're adding to an existing entry */
271
272         tdb_append(tdb, kbuf, dbuf);
273         tdb_chainunlock(tdb, kbuf);
274
275         SAFE_FREE(old_dbuf.dptr);
276         SAFE_FREE(dbuf.dptr);
277
278         errno = 0;                    /* paranoia */
279         return message_notify(pid);
280 }
281
282 /****************************************************************************
283  Send a message to a particular pid - no timeout.
284 ****************************************************************************/
285
286 BOOL message_send_pid(pid_t pid, int msg_type, const void *buf, size_t len, BOOL duplicates_allowed)
287 {
288         return message_send_pid_internal(pid, msg_type, buf, len, duplicates_allowed, 0);
289 }
290
291 /****************************************************************************
292  Send a message to a particular pid, with timeout in seconds.
293 ****************************************************************************/
294
295 BOOL message_send_pid_with_timeout(pid_t pid, int msg_type, const void *buf, size_t len,
296                 BOOL duplicates_allowed, unsigned int timeout)
297 {
298         return message_send_pid_internal(pid, msg_type, buf, len, duplicates_allowed, timeout);
299 }
300
301 /****************************************************************************
302  Retrieve all messages for the current process.
303 ****************************************************************************/
304
305 static BOOL retrieve_all_messages(char **msgs_buf, size_t *total_len)
306 {
307         TDB_DATA kbuf;
308         TDB_DATA dbuf;
309         TDB_DATA null_dbuf;
310
311         ZERO_STRUCT(null_dbuf);
312
313         *msgs_buf = NULL;
314         *total_len = 0;
315
316         kbuf = message_key_pid(sys_getpid());
317
318         tdb_chainlock(tdb, kbuf);
319         dbuf = tdb_fetch(tdb, kbuf);
320         /*
321          * Replace with an empty record to keep the allocated
322          * space in the tdb.
323          */
324         tdb_store(tdb, kbuf, null_dbuf, TDB_REPLACE);
325         tdb_chainunlock(tdb, kbuf);
326
327         if (dbuf.dptr == NULL || dbuf.dsize == 0) {
328                 SAFE_FREE(dbuf.dptr);
329                 return False;
330         }
331
332         *msgs_buf = dbuf.dptr;
333         *total_len = dbuf.dsize;
334
335         return True;
336 }
337
338 /****************************************************************************
339  Parse out the next message for the current process.
340 ****************************************************************************/
341
342 static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type, pid_t *src, char **buf, size_t *len)
343 {
344         struct message_rec rec;
345         char *ret_buf = *buf;
346
347         *buf = NULL;
348         *len = 0;
349
350         if (total_len - (ret_buf - msgs_buf) < sizeof(rec))
351                 return False;
352
353         memcpy(&rec, ret_buf, sizeof(rec));
354         ret_buf += sizeof(rec);
355
356         if (rec.msg_version != MESSAGE_VERSION) {
357                 DEBUG(0,("message version %d received (expected %d)\n", rec.msg_version, MESSAGE_VERSION));
358                 return False;
359         }
360
361         if (rec.len > 0) {
362                 if (total_len - (ret_buf - msgs_buf) < rec.len)
363                         return False;
364         }
365
366         *len = rec.len;
367         *msg_type = rec.msg_type;
368         *src = rec.src;
369         *buf = ret_buf;
370
371         return True;
372 }
373
374 /****************************************************************************
375  Receive and dispatch any messages pending for this process.
376  Notice that all dispatch handlers for a particular msg_type get called,
377  so you can register multiple handlers for a message.
378  *NOTE*: Dispatch functions must be able to cope with incoming
379  messages on an *odd* byte boundary.
380 ****************************************************************************/
381
382 void message_dispatch(void)
383 {
384         int msg_type;
385         pid_t src;
386         char *buf;
387         char *msgs_buf;
388         size_t len, total_len;
389         struct dispatch_fns *dfn;
390         int n_handled;
391
392         if (!received_signal)
393                 return;
394
395         DEBUG(10,("message_dispatch: received_signal = %d\n", received_signal));
396
397         received_signal = 0;
398
399         if (!retrieve_all_messages(&msgs_buf, &total_len))
400                 return;
401
402         for (buf = msgs_buf; message_recv(msgs_buf, total_len, &msg_type, &src, &buf, &len); buf += len) {
403                 DEBUG(10,("message_dispatch: received msg_type=%d src_pid=%u\n",
404                           msg_type, (unsigned int) src));
405                 n_handled = 0;
406                 for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
407                         if (dfn->msg_type == msg_type) {
408                                 DEBUG(10,("message_dispatch: processing message of type %d.\n", msg_type));
409                                 dfn->fn(msg_type, src, len ? (void *)buf : NULL, len);
410                                 n_handled++;
411                         }
412                 }
413                 if (!n_handled) {
414                         DEBUG(5,("message_dispatch: warning: no handlers registed for "
415                                  "msg_type %d in pid %u\n",
416                                  msg_type, (unsigned int)sys_getpid()));
417                 }
418         }
419         SAFE_FREE(msgs_buf);
420 }
421
422 /****************************************************************************
423  Register a dispatch function for a particular message type.
424  *NOTE*: Dispatch functions must be able to cope with incoming
425  messages on an *odd* byte boundary.
426 ****************************************************************************/
427
428 void message_register(int msg_type, 
429                       void (*fn)(int msg_type, pid_t pid, void *buf, size_t len))
430 {
431         struct dispatch_fns *dfn;
432
433         dfn = (struct dispatch_fns *)malloc(sizeof(*dfn));
434
435         if (dfn != NULL) {
436
437                 ZERO_STRUCTPN(dfn);
438
439                 dfn->msg_type = msg_type;
440                 dfn->fn = fn;
441
442                 DLIST_ADD(dispatch_fns, dfn);
443         }
444         else {
445         
446                 DEBUG(0,("message_register: Not enough memory. malloc failed!\n"));
447         }
448 }
449
450 /****************************************************************************
451  De-register the function for a particular message type.
452 ****************************************************************************/
453
454 void message_deregister(int msg_type)
455 {
456         struct dispatch_fns *dfn, *next;
457
458         for (dfn = dispatch_fns; dfn; dfn = next) {
459                 next = dfn->next;
460                 if (dfn->msg_type == msg_type) {
461                         DLIST_REMOVE(dispatch_fns, dfn);
462                         SAFE_FREE(dfn);
463                 }
464         }       
465 }
466
467 struct msg_all {
468         int msg_type;
469         uint32 msg_flag;
470         const void *buf;
471         size_t len;
472         BOOL duplicates;
473         int n_sent;
474 };
475
476 /****************************************************************************
477  Send one of the messages for the broadcast.
478 ****************************************************************************/
479
480 static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void *state)
481 {
482         struct connections_data crec;
483         struct msg_all *msg_all = (struct msg_all *)state;
484
485         if (dbuf.dsize != sizeof(crec))
486                 return 0;
487
488         memcpy(&crec, dbuf.dptr, sizeof(crec));
489
490         if (crec.cnum != -1)
491                 return 0;
492
493         /* Don't send if the receiver hasn't registered an interest. */
494
495         if(!(crec.bcast_msg_flags & msg_all->msg_flag))
496                 return 0;
497
498         /* If the msg send fails because the pid was not found (i.e. smbd died), 
499          * the msg has already been deleted from the messages.tdb.*/
500
501         if (!message_send_pid(crec.pid, msg_all->msg_type,
502                               msg_all->buf, msg_all->len,
503                               msg_all->duplicates)) {
504                 
505                 /* If the pid was not found delete the entry from connections.tdb */
506
507                 if (errno == ESRCH) {
508                         DEBUG(2,("pid %u doesn't exist - deleting connections %d [%s]\n",
509                                         (unsigned int)crec.pid, crec.cnum, crec.name));
510                         tdb_delete(the_tdb, kbuf);
511                 }
512         }
513         msg_all->n_sent++;
514         return 0;
515 }
516
517 /**
518  * Send a message to all smbd processes.
519  *
520  * It isn't very efficient, but should be OK for the sorts of
521  * applications that use it. When we need efficient broadcast we can add
522  * it.
523  *
524  * @param n_sent Set to the number of messages sent.  This should be
525  * equal to the number of processes, but be careful for races.
526  *
527  * @return True for success.
528  **/
529 BOOL message_send_all(TDB_CONTEXT *conn_tdb, int msg_type,
530                       const void *buf, size_t len,
531                       BOOL duplicates_allowed,
532                       int *n_sent)
533 {
534         struct msg_all msg_all;
535
536         msg_all.msg_type = msg_type;
537         if (msg_type < 1000)
538                 msg_all.msg_flag = FLAG_MSG_GENERAL;
539         else if (msg_type > 1000 && msg_type < 2000)
540                 msg_all.msg_flag = FLAG_MSG_NMBD;
541         else if (msg_type > 2000 && msg_type < 3000)
542                 msg_all.msg_flag = FLAG_MSG_PRINTING;
543         else if (msg_type > 3000 && msg_type < 4000)
544                 msg_all.msg_flag = FLAG_MSG_SMBD;
545         else
546                 return False;
547
548         msg_all.buf = buf;
549         msg_all.len = len;
550         msg_all.duplicates = duplicates_allowed;
551         msg_all.n_sent = 0;
552
553         tdb_traverse(conn_tdb, traverse_fn, &msg_all);
554         if (n_sent)
555                 *n_sent = msg_all.n_sent;
556         return True;
557 }
558 /** @} **/