r23024: Ok, neither the duplicates_allowed nor the timeout argument to
[bbaumbach/samba-autobuild/.git] / source / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 2 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23
24 /**
25   @defgroup messages Internal messaging framework
26   @{
27   @file messages.c
28   
29   @brief  Module for internal messaging between Samba daemons. 
30
31    The idea is that if a part of Samba wants to do communication with
32    another Samba process then it will do a message_register() of a
33    dispatch function, and use message_send_pid() to send messages to
34    that process.
35
36    The dispatch function is given the pid of the sender, and it can
37    use that to reply by message_send_pid().  See ping_message() for a
38    simple example.
39
40    @caution Dispatch functions must be able to cope with incoming
41    messages on an *odd* byte boundary.
42
43    This system doesn't have any inherent size limitations but is not
44    very efficient for large messages or when messages are sent in very
45    quick succession.
46
47 */
48
49 #include "includes.h"
50
51 /* the locking database handle */
52 static TDB_CONTEXT *tdb;
53 static int received_signal;
54
55 /* change the message version with any incompatible changes in the protocol */
56 #define MESSAGE_VERSION 1
57
58 struct message_rec {
59         int msg_version;
60         int msg_type;
61         struct server_id dest;
62         struct server_id src;
63         size_t len;
64 };
65
66 /* we have a linked list of dispatch handlers */
67 static struct dispatch_fns {
68         struct dispatch_fns *next, *prev;
69         int msg_type;
70         void (*fn)(int msg_type, struct server_id pid, void *buf, size_t len,
71                    void *private_data);
72         void *private_data;
73 } *dispatch_fns;
74
75 static void message_register(int msg_type, 
76                              void (*fn)(int msg_type, struct server_id pid,
77                                         void *buf, size_t len,
78                                         void *private_data),
79                              void *private_data);
80
81 /****************************************************************************
82  Free global objects.
83 ****************************************************************************/
84
85 void gfree_messages(void)
86 {
87         struct dispatch_fns *dfn, *next;
88
89         /* delete the dispatch_fns list */
90         dfn = dispatch_fns;
91         while( dfn ) {
92                 next = dfn->next;
93                 DLIST_REMOVE(dispatch_fns, dfn);
94                 SAFE_FREE(dfn);
95                 dfn = next;
96         }
97 }
98
99 /****************************************************************************
100  Notifications come in as signals.
101 ****************************************************************************/
102
103 static void sig_usr1(void)
104 {
105         received_signal = 1;
106         sys_select_signal(SIGUSR1);
107 }
108
109 static NTSTATUS message_send_pid(struct server_id pid, int msg_type,
110                                  const void *buf, size_t len);
111
112 /****************************************************************************
113  A useful function for testing the message system.
114 ****************************************************************************/
115
116 static void ping_message(int msg_type, struct server_id src,
117                          void *buf, size_t len, void *private_data)
118 {
119         const char *msg = buf ? (const char *)buf : "none";
120
121         DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
122                  procid_str_static(&src), msg));
123         message_send_pid(src, MSG_PONG, buf, len);
124 }
125
126 /****************************************************************************
127  Initialise the messaging functions. 
128 ****************************************************************************/
129
130 static BOOL message_init(struct messaging_context *msg_ctx)
131 {
132         sec_init();
133
134         if (tdb)
135                 return True;
136
137         tdb = tdb_open_log(lock_path("messages.tdb"), 
138                        0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, 
139                        O_RDWR|O_CREAT,0600);
140
141         if (!tdb) {
142                 DEBUG(0,("ERROR: Failed to initialise messages database\n"));
143                 return False;
144         }
145
146         /* Activate the per-hashchain freelist */
147         tdb_set_max_dead(tdb, 5);
148
149         CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
150
151         message_register(MSG_PING, ping_message, NULL);
152
153         /* Register some debugging related messages */
154
155         register_msg_pool_usage(msg_ctx);
156         register_dmalloc_msgs(msg_ctx);
157         debug_register_msgs(msg_ctx);
158
159         return True;
160 }
161
162 /*******************************************************************
163  Form a static tdb key from a pid.
164 ******************************************************************/
165
166 static TDB_DATA message_key_pid(struct server_id pid)
167 {
168         static char key[20];
169         TDB_DATA kbuf;
170
171         slprintf(key, sizeof(key)-1, "PID/%s", procid_str_static(&pid));
172         
173         kbuf.dptr = (uint8 *)key;
174         kbuf.dsize = strlen(key)+1;
175         return kbuf;
176 }
177
178 /****************************************************************************
179  Notify a process that it has a message. If the process doesn't exist 
180  then delete its record in the database.
181 ****************************************************************************/
182
183 static NTSTATUS message_notify(struct server_id procid)
184 {
185         pid_t pid = procid.pid;
186         int ret;
187         uid_t euid = geteuid();
188
189         /*
190          * Doing kill with a non-positive pid causes messages to be
191          * sent to places we don't want.
192          */
193
194         SMB_ASSERT(pid > 0);
195
196         if (euid != 0) {
197                 /* If we're not root become so to send the message. */
198                 save_re_uid();
199                 set_effective_uid(0);
200         }
201
202         ret = kill(pid, SIGUSR1);
203
204         if (euid != 0) {
205                 /* Go back to who we were. */
206                 int saved_errno = errno;
207                 restore_re_uid_fromroot();
208                 errno = saved_errno;
209         }
210
211         if (ret == -1) {
212                 if (errno == ESRCH) {
213                         DEBUG(2,("pid %d doesn't exist - deleting messages record\n",
214                                  (int)pid));
215                         tdb_delete(tdb, message_key_pid(procid));
216
217                         /*
218                          * INVALID_HANDLE is the closest I can think of -- vl
219                          */
220                         return NT_STATUS_INVALID_HANDLE;
221                 }
222
223                 DEBUG(2,("message to process %d failed - %s\n", (int)pid,
224                          strerror(errno)));
225
226                 /*
227                  * No call to map_nt_error_from_unix -- don't want to link in
228                  * errormap.o into lots of utils.
229                  */
230
231                 if (errno == EINVAL) return NT_STATUS_INVALID_PARAMETER;
232                 if (errno == EPERM)  return NT_STATUS_ACCESS_DENIED;
233                 return NT_STATUS_UNSUCCESSFUL;
234         }
235
236         return NT_STATUS_OK;
237 }
238
239 /****************************************************************************
240  Send a message to a particular pid.
241 ****************************************************************************/
242
243 static NTSTATUS message_send_pid(struct server_id pid, int msg_type,
244                                  const void *buf, size_t len)
245 {
246         TDB_DATA kbuf;
247         TDB_DATA dbuf;
248         struct message_rec rec;
249         int ret;
250
251         /* NULL pointer means implicit length zero. */
252         if (!buf) {
253                 SMB_ASSERT(len == 0);
254         }
255
256         /*
257          * Doing kill with a non-positive pid causes messages to be
258          * sent to places we don't want.
259          */
260
261         SMB_ASSERT(procid_to_pid(&pid) > 0);
262
263         rec.msg_version = MESSAGE_VERSION;
264         rec.msg_type = msg_type;
265         rec.dest = pid;
266         rec.src = procid_self();
267         rec.len = buf ? len : 0;
268
269         kbuf = message_key_pid(pid);
270
271         dbuf.dptr = (uint8 *)SMB_MALLOC(len + sizeof(rec));
272         if (!dbuf.dptr) {
273                 return NT_STATUS_NO_MEMORY;
274         }
275
276         memcpy(dbuf.dptr, &rec, sizeof(rec));
277         if (len > 0 && buf)
278                 memcpy((void *)((char*)dbuf.dptr+sizeof(rec)), buf, len);
279
280         dbuf.dsize = len + sizeof(rec);
281
282         ret = tdb_append(tdb, kbuf, dbuf);
283
284         SAFE_FREE(dbuf.dptr);
285
286         if (ret == -1) {
287                 return NT_STATUS_INTERNAL_ERROR;
288         }
289
290         errno = 0;                    /* paranoia */
291         return message_notify(pid);
292 }
293
294 /****************************************************************************
295  Count the messages pending for a particular pid. Expensive....
296 ****************************************************************************/
297
298 unsigned int messages_pending_for_pid(struct server_id pid)
299 {
300         TDB_DATA kbuf;
301         TDB_DATA dbuf;
302         uint8 *buf;
303         unsigned int message_count = 0;
304
305         kbuf = message_key_pid(pid);
306
307         dbuf = tdb_fetch(tdb, kbuf);
308         if (dbuf.dptr == NULL || dbuf.dsize == 0) {
309                 SAFE_FREE(dbuf.dptr);
310                 return 0;
311         }
312
313         for (buf = dbuf.dptr; dbuf.dsize > sizeof(struct message_rec);) {
314                 struct message_rec rec;
315                 memcpy(&rec, buf, sizeof(rec));
316                 buf += (sizeof(rec) + rec.len);
317                 dbuf.dsize -= (sizeof(rec) + rec.len);
318                 message_count++;
319         }
320
321         SAFE_FREE(dbuf.dptr);
322         return message_count;
323 }
324
325 /****************************************************************************
326  Retrieve all messages for the current process.
327 ****************************************************************************/
328
329 static BOOL retrieve_all_messages(char **msgs_buf, size_t *total_len)
330 {
331         TDB_DATA kbuf;
332         TDB_DATA dbuf;
333         TDB_DATA null_dbuf;
334
335         ZERO_STRUCT(null_dbuf);
336
337         *msgs_buf = NULL;
338         *total_len = 0;
339
340         kbuf = message_key_pid(pid_to_procid(sys_getpid()));
341
342         if (tdb_chainlock(tdb, kbuf) == -1)
343                 return False;
344
345         dbuf = tdb_fetch(tdb, kbuf);
346         /*
347          * Replace with an empty record to keep the allocated
348          * space in the tdb.
349          */
350         tdb_store(tdb, kbuf, null_dbuf, TDB_REPLACE);
351         tdb_chainunlock(tdb, kbuf);
352
353         if (dbuf.dptr == NULL || dbuf.dsize == 0) {
354                 SAFE_FREE(dbuf.dptr);
355                 return False;
356         }
357
358         *msgs_buf = (char *)dbuf.dptr;
359         *total_len = dbuf.dsize;
360
361         return True;
362 }
363
364 /****************************************************************************
365  Parse out the next message for the current process.
366 ****************************************************************************/
367
368 static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type,
369                          struct server_id *src, char **buf, size_t *len)
370 {
371         struct message_rec rec;
372         char *ret_buf = *buf;
373
374         *buf = NULL;
375         *len = 0;
376
377         if (total_len - (ret_buf - msgs_buf) < sizeof(rec))
378                 return False;
379
380         memcpy(&rec, ret_buf, sizeof(rec));
381         ret_buf += sizeof(rec);
382
383         if (rec.msg_version != MESSAGE_VERSION) {
384                 DEBUG(0,("message version %d received (expected %d)\n",
385                          rec.msg_version, MESSAGE_VERSION));
386                 return False;
387         }
388
389         if (rec.len > 0) {
390                 if (total_len - (ret_buf - msgs_buf) < rec.len)
391                         return False;
392         }
393
394         *len = rec.len;
395         *msg_type = rec.msg_type;
396         *src = rec.src;
397         *buf = ret_buf;
398
399         return True;
400 }
401
402 /****************************************************************************
403  Receive and dispatch any messages pending for this process.
404  JRA changed Dec 13 2006. Only one message handler now permitted per type.
405  *NOTE*: Dispatch functions must be able to cope with incoming
406  messages on an *odd* byte boundary.
407 ****************************************************************************/
408
409 void message_dispatch(void)
410 {
411         int msg_type;
412         struct server_id src;
413         char *buf;
414         char *msgs_buf;
415         size_t len, total_len;
416         int n_handled;
417
418         if (!received_signal)
419                 return;
420
421         DEBUG(10, ("message_dispatch: received_signal = %d\n",
422                    received_signal));
423
424         received_signal = 0;
425
426         if (!retrieve_all_messages(&msgs_buf, &total_len))
427                 return;
428
429         for (buf = msgs_buf;
430              message_recv(msgs_buf, total_len, &msg_type, &src, &buf, &len);
431              buf += len) {
432                 struct dispatch_fns *dfn;
433
434                 DEBUG(10,("message_dispatch: received msg_type=%d "
435                           "src_pid=%u\n", msg_type,
436                           (unsigned int) procid_to_pid(&src)));
437
438                 n_handled = 0;
439                 for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
440                         if (dfn->msg_type == msg_type) {
441                                 DEBUG(10,("message_dispatch: processing "
442                                           "message of type %d.\n", msg_type));
443                                 dfn->fn(msg_type, src,
444                                         len ? (void *)buf : NULL, len,
445                                         dfn->private_data);
446                                 n_handled++;
447                                 break;
448                         }
449                 }
450                 if (!n_handled) {
451                         DEBUG(5,("message_dispatch: warning: no handler "
452                                  "registed for msg_type %d in pid %u\n",
453                                  msg_type, (unsigned int)sys_getpid()));
454                 }
455         }
456         SAFE_FREE(msgs_buf);
457 }
458
459 /****************************************************************************
460  Register/replace a dispatch function for a particular message type.
461  JRA changed Dec 13 2006. Only one message handler now permitted per type.
462  *NOTE*: Dispatch functions must be able to cope with incoming
463  messages on an *odd* byte boundary.
464 ****************************************************************************/
465
466 static void message_register(int msg_type, 
467                              void (*fn)(int msg_type, struct server_id pid,
468                                         void *buf, size_t len,
469                                         void *private_data),
470                              void *private_data)
471 {
472         struct dispatch_fns *dfn;
473
474         for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
475                 if (dfn->msg_type == msg_type) {
476                         dfn->fn = fn;
477                         return;
478                 }
479         }
480
481         if (!(dfn = SMB_MALLOC_P(struct dispatch_fns))) {
482                 DEBUG(0,("message_register: Not enough memory. malloc "
483                          "failed!\n"));
484                 return;
485         }
486
487         ZERO_STRUCTPN(dfn);
488
489         dfn->msg_type = msg_type;
490         dfn->fn = fn;
491         dfn->private_data = private_data;
492
493         DLIST_ADD(dispatch_fns, dfn);
494 }
495
496 /****************************************************************************
497  De-register the function for a particular message type.
498 ****************************************************************************/
499
500 static void message_deregister(int msg_type)
501 {
502         struct dispatch_fns *dfn, *next;
503
504         for (dfn = dispatch_fns; dfn; dfn = next) {
505                 next = dfn->next;
506                 if (dfn->msg_type == msg_type) {
507                         DLIST_REMOVE(dispatch_fns, dfn);
508                         SAFE_FREE(dfn);
509                         return;
510                 }
511         }       
512 }
513
514 struct msg_all {
515         int msg_type;
516         uint32 msg_flag;
517         const void *buf;
518         size_t len;
519         BOOL duplicates;
520         int n_sent;
521 };
522
523 /****************************************************************************
524  Send one of the messages for the broadcast.
525 ****************************************************************************/
526
527 static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf,
528                        void *state)
529 {
530         struct connections_data crec;
531         struct msg_all *msg_all = (struct msg_all *)state;
532         NTSTATUS status;
533
534         if (dbuf.dsize != sizeof(crec))
535                 return 0;
536
537         memcpy(&crec, dbuf.dptr, sizeof(crec));
538
539         if (crec.cnum != -1)
540                 return 0;
541
542         /* Don't send if the receiver hasn't registered an interest. */
543
544         if(!(crec.bcast_msg_flags & msg_all->msg_flag))
545                 return 0;
546
547         /* If the msg send fails because the pid was not found (i.e. smbd died), 
548          * the msg has already been deleted from the messages.tdb.*/
549
550         status = message_send_pid(crec.pid, msg_all->msg_type,
551                                   msg_all->buf, msg_all->len);
552
553         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
554                 
555                 /* If the pid was not found delete the entry from
556                  * connections.tdb */
557
558                 DEBUG(2,("pid %s doesn't exist - deleting connections "
559                          "%d [%s]\n", procid_str_static(&crec.pid), crec.cnum,
560                          crec.servicename));
561                 tdb_delete(the_tdb, kbuf);
562         }
563         msg_all->n_sent++;
564         return 0;
565 }
566
567 /**
568  * Send a message to all smbd processes.
569  *
570  * It isn't very efficient, but should be OK for the sorts of
571  * applications that use it. When we need efficient broadcast we can add
572  * it.
573  *
574  * @param n_sent Set to the number of messages sent.  This should be
575  * equal to the number of processes, but be careful for races.
576  *
577  * @retval True for success.
578  **/
579 BOOL message_send_all(struct messaging_context *msg_ctx,
580                       int msg_type,
581                       const void *buf, size_t len,
582                       BOOL duplicates_allowed,
583                       int *n_sent)
584 {
585         struct msg_all msg_all;
586
587         msg_all.msg_type = msg_type;
588         if (msg_type < 1000)
589                 msg_all.msg_flag = FLAG_MSG_GENERAL;
590         else if (msg_type > 1000 && msg_type < 2000)
591                 msg_all.msg_flag = FLAG_MSG_NMBD;
592         else if (msg_type > 2000 && msg_type < 2100)
593                 msg_all.msg_flag = FLAG_MSG_PRINT_NOTIFY;
594         else if (msg_type > 2100 && msg_type < 3000)
595                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
596         else if (msg_type > 3000 && msg_type < 4000)
597                 msg_all.msg_flag = FLAG_MSG_SMBD;
598         else
599                 return False;
600
601         msg_all.buf = buf;
602         msg_all.len = len;
603         msg_all.duplicates = duplicates_allowed;
604         msg_all.n_sent = 0;
605
606         connections_traverse(traverse_fn, &msg_all);
607         if (n_sent)
608                 *n_sent = msg_all.n_sent;
609         return True;
610 }
611
612 /*
613  * Block and unblock receiving of messages. Allows removal of race conditions
614  * when doing a fork and changing message disposition.
615  */
616
617 void message_block(void)
618 {
619         BlockSignals(True, SIGUSR1);
620 }
621
622 void message_unblock(void)
623 {
624         BlockSignals(False, SIGUSR1);
625 }
626
627 /*
628  * Samba4 API wrapper around the Samba3 implementation. Yes, I know, we could
629  * import the whole Samba4 thing, but I want notify.c from Samba4 in first.
630  */
631
632 struct messaging_callback {
633         struct messaging_callback *prev, *next;
634         uint32 msg_type;
635         void (*fn)(struct messaging_context *msg, void *private_data, 
636                    uint32_t msg_type, 
637                    struct server_id server_id, DATA_BLOB *data);
638         void *private_data;
639 };
640
641 struct messaging_context {
642         struct server_id id;
643         struct event_context *event_ctx;
644         struct messaging_callback *callbacks;
645 };
646
647 static int messaging_context_destructor(struct messaging_context *ctx)
648 {
649         struct messaging_callback *cb;
650
651         for (cb = ctx->callbacks; cb; cb = cb->next) {
652                 /*
653                  * We unconditionally remove all instances of our callback
654                  * from the tdb basis.
655                  */
656                 message_deregister(cb->msg_type);
657         }
658         return 0;
659 }
660
661 struct event_context *messaging_event_context(struct messaging_context *msg_ctx)
662 {
663         return msg_ctx->event_ctx;
664 }
665
666 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
667                                          struct server_id server_id, 
668                                          struct event_context *ev)
669 {
670         struct messaging_context *ctx;
671
672         if (!(ctx = TALLOC_ZERO_P(mem_ctx, struct messaging_context))) {
673                 return NULL;
674         }
675
676         ctx->id = server_id;
677         ctx->event_ctx = ev;
678         talloc_set_destructor(ctx, messaging_context_destructor);
679
680         if (!message_init(ctx)) {
681                 DEBUG(0, ("message_init failed: %s\n", strerror(errno)));
682                 TALLOC_FREE(ctx);
683         }
684
685         return ctx;
686 }
687
688 static void messaging_callback(int msg_type, struct server_id pid,
689                                void *buf, size_t len, void *private_data)
690 {
691         struct messaging_context *ctx = talloc_get_type_abort(
692                 private_data, struct messaging_context);
693         struct messaging_callback *cb, *next;
694
695         for (cb = ctx->callbacks; cb; cb = next) {
696                 /*
697                  * Allow a callback to remove itself
698                  */
699                 next = cb->next;
700
701                 if (msg_type == cb->msg_type) {
702                         DATA_BLOB blob;
703
704                         blob.data = (uint8 *)buf;
705                         blob.length = len;
706
707                         cb->fn(ctx, cb->private_data, msg_type, pid, &blob);
708                 }
709         }
710 }
711
712 /*
713  * Register a dispatch function for a particular message type. Allow multiple
714  * registrants
715 */
716 NTSTATUS messaging_register(struct messaging_context *ctx, void *private_data,
717                             uint32_t msg_type,
718                             void (*fn)(struct messaging_context *msg,
719                                        void *private_data, 
720                                        uint32_t msg_type, 
721                                        struct server_id server_id,
722                                        DATA_BLOB *data))
723 {
724         struct messaging_callback *cb;
725
726         if (!(cb = talloc(ctx, struct messaging_callback))) {
727                 return NT_STATUS_NO_MEMORY;
728         }
729
730         cb->msg_type = msg_type;
731         cb->fn = fn;
732         cb->private_data = private_data;
733
734         DLIST_ADD(ctx->callbacks, cb);
735         message_register(msg_type, messaging_callback, ctx);
736         return NT_STATUS_OK;
737 }
738
739 /*
740   De-register the function for a particular message type.
741 */
742 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
743                           void *private_data)
744 {
745         struct messaging_callback *cb, *next;
746
747         for (cb = ctx->callbacks; cb; cb = next) {
748                 next = cb->next;
749                 if ((cb->msg_type == msg_type)
750                     && (cb->private_data == private_data)) {
751                         DLIST_REMOVE(ctx->callbacks, cb);
752                         TALLOC_FREE(cb);
753                 }
754         }
755 }
756
757 /*
758   Send a message to a particular server
759 */
760 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
761                         struct server_id server, 
762                         uint32_t msg_type, const DATA_BLOB *data)
763 {
764         return message_send_pid(server, msg_type, data->data, data->length);
765 }
766
767 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
768                             struct server_id server, uint32_t msg_type,
769                             const uint8 *buf, size_t len)
770 {
771         DATA_BLOB blob = data_blob_const(buf, len);
772         return messaging_send(msg_ctx, server, msg_type, &blob);
773 }
774
775 /** @} **/