r22911: Pass a messaging_context to message_send_all
[samba.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 2 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27   
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49
50 /* the locking database handle */
51 static TDB_CONTEXT *tdb;
52 static int received_signal;
53
54 /* change the message version with any incompatible changes in the protocol */
55 #define MESSAGE_VERSION 1
56
57 struct message_rec {
58         int msg_version;
59         int msg_type;
60         struct server_id dest;
61         struct server_id src;
62         size_t len;
63 };
64
65 /* we have a linked list of dispatch handlers */
66 static struct dispatch_fns {
67         struct dispatch_fns *next, *prev;
68         int msg_type;
69         void (*fn)(int msg_type, struct server_id pid, void *buf, size_t len,
70                    void *private_data);
71         void *private_data;
72 } *dispatch_fns;
73
74 /****************************************************************************
75  Free global objects.
76 ****************************************************************************/
77
78 void gfree_messages(void)
79 {
80         struct dispatch_fns *dfn, *next;
81
82         /* delete the dispatch_fns list */
83         dfn = dispatch_fns;
84         while( dfn ) {
85                 next = dfn->next;
86                 DLIST_REMOVE(dispatch_fns, dfn);
87                 SAFE_FREE(dfn);
88                 dfn = next;
89         }
90 }
91
92 /****************************************************************************
93  Notifications come in as signals.
94 ****************************************************************************/
95
96 static void sig_usr1(void)
97 {
98         received_signal = 1;
99         sys_select_signal(SIGUSR1);
100 }
101
102 static NTSTATUS message_send_pid(struct server_id pid, int msg_type,
103                                  const void *buf, size_t len,
104                                  BOOL duplicates_allowed);
105
106 /****************************************************************************
107  A useful function for testing the message system.
108 ****************************************************************************/
109
110 static void ping_message(int msg_type, struct server_id src,
111                          void *buf, size_t len, void *private_data)
112 {
113         const char *msg = buf ? (const char *)buf : "none";
114
115         DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
116                  procid_str_static(&src), msg));
117         message_send_pid(src, MSG_PONG, buf, len, True);
118 }
119
120 /****************************************************************************
121  Initialise the messaging functions. 
122 ****************************************************************************/
123
124 static BOOL message_init(struct messaging_context *msg_ctx)
125 {
126         sec_init();
127
128         if (tdb)
129                 return True;
130
131         tdb = tdb_open_log(lock_path("messages.tdb"), 
132                        0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, 
133                        O_RDWR|O_CREAT,0600);
134
135         if (!tdb) {
136                 DEBUG(0,("ERROR: Failed to initialise messages database\n"));
137                 return False;
138         }
139
140         /* Activate the per-hashchain freelist */
141         tdb_set_max_dead(tdb, 5);
142
143         CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
144
145         message_register(MSG_PING, ping_message, NULL);
146
147         /* Register some debugging related messages */
148
149         register_msg_pool_usage(msg_ctx);
150         register_dmalloc_msgs();
151         debug_register_msgs(msg_ctx);
152
153         return True;
154 }
155
156 /*******************************************************************
157  Form a static tdb key from a pid.
158 ******************************************************************/
159
160 static TDB_DATA message_key_pid(struct server_id pid)
161 {
162         static char key[20];
163         TDB_DATA kbuf;
164
165         slprintf(key, sizeof(key)-1, "PID/%s", procid_str_static(&pid));
166         
167         kbuf.dptr = (uint8 *)key;
168         kbuf.dsize = strlen(key)+1;
169         return kbuf;
170 }
171
172 /****************************************************************************
173  Notify a process that it has a message. If the process doesn't exist 
174  then delete its record in the database.
175 ****************************************************************************/
176
177 static NTSTATUS message_notify(struct server_id procid)
178 {
179         pid_t pid = procid.pid;
180         int ret;
181         uid_t euid = geteuid();
182
183         /*
184          * Doing kill with a non-positive pid causes messages to be
185          * sent to places we don't want.
186          */
187
188         SMB_ASSERT(pid > 0);
189
190         if (euid != 0) {
191                 /* If we're not root become so to send the message. */
192                 save_re_uid();
193                 set_effective_uid(0);
194         }
195
196         ret = kill(pid, SIGUSR1);
197
198         if (euid != 0) {
199                 /* Go back to who we were. */
200                 int saved_errno = errno;
201                 restore_re_uid_fromroot();
202                 errno = saved_errno;
203         }
204
205         if (ret == -1) {
206                 if (errno == ESRCH) {
207                         DEBUG(2,("pid %d doesn't exist - deleting messages record\n",
208                                  (int)pid));
209                         tdb_delete(tdb, message_key_pid(procid));
210
211                         /*
212                          * INVALID_HANDLE is the closest I can think of -- vl
213                          */
214                         return NT_STATUS_INVALID_HANDLE;
215                 }
216
217                 DEBUG(2,("message to process %d failed - %s\n", (int)pid,
218                          strerror(errno)));
219
220                 /*
221                  * No call to map_nt_error_from_unix -- don't want to link in
222                  * errormap.o into lots of utils.
223                  */
224
225                 if (errno == EINVAL) return NT_STATUS_INVALID_PARAMETER;
226                 if (errno == EPERM)  return NT_STATUS_ACCESS_DENIED;
227                 return NT_STATUS_UNSUCCESSFUL;
228         }
229
230         return NT_STATUS_OK;
231 }
232
233 /****************************************************************************
234  Send a message to a particular pid.
235 ****************************************************************************/
236
237 static NTSTATUS message_send_pid_internal(struct server_id pid, int msg_type,
238                                           const void *buf, size_t len,
239                                           BOOL duplicates_allowed,
240                                           unsigned int timeout)
241 {
242         TDB_DATA kbuf;
243         TDB_DATA dbuf;
244         TDB_DATA old_dbuf;
245         struct message_rec rec;
246         uint8 *ptr;
247         struct message_rec prec;
248
249         /* NULL pointer means implicit length zero. */
250         if (!buf) {
251                 SMB_ASSERT(len == 0);
252         }
253
254         /*
255          * Doing kill with a non-positive pid causes messages to be
256          * sent to places we don't want.
257          */
258
259         SMB_ASSERT(procid_to_pid(&pid) > 0);
260
261         rec.msg_version = MESSAGE_VERSION;
262         rec.msg_type = msg_type;
263         rec.dest = pid;
264         rec.src = procid_self();
265         rec.len = buf ? len : 0;
266
267         kbuf = message_key_pid(pid);
268
269         dbuf.dptr = (uint8 *)SMB_MALLOC(len + sizeof(rec));
270         if (!dbuf.dptr) {
271                 return NT_STATUS_NO_MEMORY;
272         }
273
274         memcpy(dbuf.dptr, &rec, sizeof(rec));
275         if (len > 0 && buf)
276                 memcpy((void *)((char*)dbuf.dptr+sizeof(rec)), buf, len);
277
278         dbuf.dsize = len + sizeof(rec);
279
280         if (duplicates_allowed) {
281
282                 /* If duplicates are allowed we can just append the message and return. */
283
284                 /* lock the record for the destination */
285                 if (timeout) {
286                         if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) {
287                                 DEBUG(0,("message_send_pid_internal: failed to get "
288                                          "chainlock with timeout %ul.\n", timeout));
289                                 return NT_STATUS_IO_TIMEOUT;
290                         }
291                 } else {
292                         if (tdb_chainlock(tdb, kbuf) == -1) {
293                                 DEBUG(0,("message_send_pid_internal: failed to get "
294                                          "chainlock.\n"));
295                                 return NT_STATUS_LOCK_NOT_GRANTED;
296                         }
297                 }       
298                 tdb_append(tdb, kbuf, dbuf);
299                 tdb_chainunlock(tdb, kbuf);
300
301                 SAFE_FREE(dbuf.dptr);
302                 errno = 0;                    /* paranoia */
303                 return message_notify(pid);
304         }
305
306         /* lock the record for the destination */
307         if (timeout) {
308                 if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) {
309                         DEBUG(0,("message_send_pid_internal: failed to get chainlock "
310                                  "with timeout %ul.\n", timeout));
311                         return NT_STATUS_IO_TIMEOUT;
312                 }
313         } else {
314                 if (tdb_chainlock(tdb, kbuf) == -1) {
315                         DEBUG(0,("message_send_pid_internal: failed to get "
316                                  "chainlock.\n"));
317                         return NT_STATUS_LOCK_NOT_GRANTED;
318                 }
319         }       
320
321         old_dbuf = tdb_fetch(tdb, kbuf);
322
323         if (!old_dbuf.dptr) {
324                 /* its a new record */
325
326                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
327                 tdb_chainunlock(tdb, kbuf);
328
329                 SAFE_FREE(dbuf.dptr);
330                 errno = 0;                    /* paranoia */
331                 return message_notify(pid);
332         }
333
334         /* Not a new record. Check for duplicates. */
335
336         for(ptr = old_dbuf.dptr; ptr < old_dbuf.dptr + old_dbuf.dsize; ) {
337                 /*
338                  * First check if the message header matches, then, if it's a non-zero
339                  * sized message, check if the data matches. If so it's a duplicate and
340                  * we can discard it. JRA.
341                  */
342
343                 if (!memcmp(ptr, &rec, sizeof(rec))) {
344                         if (!len || (len && !memcmp( ptr + sizeof(rec), buf, len))) {
345                                 tdb_chainunlock(tdb, kbuf);
346                                 DEBUG(10,("message_send_pid_internal: discarding "
347                                           "duplicate message.\n"));
348                                 SAFE_FREE(dbuf.dptr);
349                                 SAFE_FREE(old_dbuf.dptr);
350                                 return NT_STATUS_OK;
351                         }
352                 }
353                 memcpy(&prec, ptr, sizeof(prec));
354                 ptr += sizeof(rec) + prec.len;
355         }
356
357         /* we're adding to an existing entry */
358
359         tdb_append(tdb, kbuf, dbuf);
360         tdb_chainunlock(tdb, kbuf);
361
362         SAFE_FREE(old_dbuf.dptr);
363         SAFE_FREE(dbuf.dptr);
364
365         errno = 0;                    /* paranoia */
366         return message_notify(pid);
367 }
368
369 /****************************************************************************
370  Send a message to a particular pid - no timeout.
371 ****************************************************************************/
372
373 static NTSTATUS message_send_pid(struct server_id pid, int msg_type,
374                                  const void *buf, size_t len,
375                                  BOOL duplicates_allowed)
376 {
377         return message_send_pid_internal(pid, msg_type, buf, len,
378                                          duplicates_allowed, 0);
379 }
380
381 /****************************************************************************
382  Count the messages pending for a particular pid. Expensive....
383 ****************************************************************************/
384
385 unsigned int messages_pending_for_pid(struct server_id pid)
386 {
387         TDB_DATA kbuf;
388         TDB_DATA dbuf;
389         uint8 *buf;
390         unsigned int message_count = 0;
391
392         kbuf = message_key_pid(pid);
393
394         dbuf = tdb_fetch(tdb, kbuf);
395         if (dbuf.dptr == NULL || dbuf.dsize == 0) {
396                 SAFE_FREE(dbuf.dptr);
397                 return 0;
398         }
399
400         for (buf = dbuf.dptr; dbuf.dsize > sizeof(struct message_rec);) {
401                 struct message_rec rec;
402                 memcpy(&rec, buf, sizeof(rec));
403                 buf += (sizeof(rec) + rec.len);
404                 dbuf.dsize -= (sizeof(rec) + rec.len);
405                 message_count++;
406         }
407
408         SAFE_FREE(dbuf.dptr);
409         return message_count;
410 }
411
412 /****************************************************************************
413  Retrieve all messages for the current process.
414 ****************************************************************************/
415
416 static BOOL retrieve_all_messages(char **msgs_buf, size_t *total_len)
417 {
418         TDB_DATA kbuf;
419         TDB_DATA dbuf;
420         TDB_DATA null_dbuf;
421
422         ZERO_STRUCT(null_dbuf);
423
424         *msgs_buf = NULL;
425         *total_len = 0;
426
427         kbuf = message_key_pid(pid_to_procid(sys_getpid()));
428
429         if (tdb_chainlock(tdb, kbuf) == -1)
430                 return False;
431
432         dbuf = tdb_fetch(tdb, kbuf);
433         /*
434          * Replace with an empty record to keep the allocated
435          * space in the tdb.
436          */
437         tdb_store(tdb, kbuf, null_dbuf, TDB_REPLACE);
438         tdb_chainunlock(tdb, kbuf);
439
440         if (dbuf.dptr == NULL || dbuf.dsize == 0) {
441                 SAFE_FREE(dbuf.dptr);
442                 return False;
443         }
444
445         *msgs_buf = (char *)dbuf.dptr;
446         *total_len = dbuf.dsize;
447
448         return True;
449 }
450
451 /****************************************************************************
452  Parse out the next message for the current process.
453 ****************************************************************************/
454
455 static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type,
456                          struct server_id *src, char **buf, size_t *len)
457 {
458         struct message_rec rec;
459         char *ret_buf = *buf;
460
461         *buf = NULL;
462         *len = 0;
463
464         if (total_len - (ret_buf - msgs_buf) < sizeof(rec))
465                 return False;
466
467         memcpy(&rec, ret_buf, sizeof(rec));
468         ret_buf += sizeof(rec);
469
470         if (rec.msg_version != MESSAGE_VERSION) {
471                 DEBUG(0,("message version %d received (expected %d)\n", rec.msg_version, MESSAGE_VERSION));
472                 return False;
473         }
474
475         if (rec.len > 0) {
476                 if (total_len - (ret_buf - msgs_buf) < rec.len)
477                         return False;
478         }
479
480         *len = rec.len;
481         *msg_type = rec.msg_type;
482         *src = rec.src;
483         *buf = ret_buf;
484
485         return True;
486 }
487
488 /****************************************************************************
489  Receive and dispatch any messages pending for this process.
490  JRA changed Dec 13 2006. Only one message handler now permitted per type.
491  *NOTE*: Dispatch functions must be able to cope with incoming
492  messages on an *odd* byte boundary.
493 ****************************************************************************/
494
495 void message_dispatch(void)
496 {
497         int msg_type;
498         struct server_id src;
499         char *buf;
500         char *msgs_buf;
501         size_t len, total_len;
502         int n_handled;
503
504         if (!received_signal)
505                 return;
506
507         DEBUG(10,("message_dispatch: received_signal = %d\n", received_signal));
508
509         received_signal = 0;
510
511         if (!retrieve_all_messages(&msgs_buf, &total_len))
512                 return;
513
514         for (buf = msgs_buf; message_recv(msgs_buf, total_len, &msg_type, &src, &buf, &len); buf += len) {
515                 struct dispatch_fns *dfn;
516
517                 DEBUG(10,("message_dispatch: received msg_type=%d "
518                           "src_pid=%u\n", msg_type,
519                           (unsigned int) procid_to_pid(&src)));
520
521                 n_handled = 0;
522                 for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
523                         if (dfn->msg_type == msg_type) {
524                                 DEBUG(10,("message_dispatch: processing message of type %d.\n", msg_type));
525                                 dfn->fn(msg_type, src,
526                                         len ? (void *)buf : NULL, len,
527                                         dfn->private_data);
528                                 n_handled++;
529                                 break;
530                         }
531                 }
532                 if (!n_handled) {
533                         DEBUG(5,("message_dispatch: warning: no handler registed for "
534                                  "msg_type %d in pid %u\n",
535                                  msg_type, (unsigned int)sys_getpid()));
536                 }
537         }
538         SAFE_FREE(msgs_buf);
539 }
540
541 /****************************************************************************
542  Register/replace a dispatch function for a particular message type.
543  JRA changed Dec 13 2006. Only one message handler now permitted per type.
544  *NOTE*: Dispatch functions must be able to cope with incoming
545  messages on an *odd* byte boundary.
546 ****************************************************************************/
547
548 void message_register(int msg_type, 
549                       void (*fn)(int msg_type, struct server_id pid,
550                                  void *buf, size_t len,
551                                  void *private_data),
552                       void *private_data)
553 {
554         struct dispatch_fns *dfn;
555
556         for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
557                 if (dfn->msg_type == msg_type) {
558                         dfn->fn = fn;
559                         return;
560                 }
561         }
562
563         dfn = SMB_MALLOC_P(struct dispatch_fns);
564
565         if (dfn != NULL) {
566
567                 ZERO_STRUCTPN(dfn);
568
569                 dfn->msg_type = msg_type;
570                 dfn->fn = fn;
571                 dfn->private_data = private_data;
572
573                 DLIST_ADD(dispatch_fns, dfn);
574         }
575         else {
576         
577                 DEBUG(0,("message_register: Not enough memory. malloc failed!\n"));
578         }
579 }
580
581 /****************************************************************************
582  De-register the function for a particular message type.
583 ****************************************************************************/
584
585 void message_deregister(int msg_type)
586 {
587         struct dispatch_fns *dfn, *next;
588
589         for (dfn = dispatch_fns; dfn; dfn = next) {
590                 next = dfn->next;
591                 if (dfn->msg_type == msg_type) {
592                         DLIST_REMOVE(dispatch_fns, dfn);
593                         SAFE_FREE(dfn);
594                         return;
595                 }
596         }       
597 }
598
599 struct msg_all {
600         int msg_type;
601         uint32 msg_flag;
602         const void *buf;
603         size_t len;
604         BOOL duplicates;
605         int n_sent;
606 };
607
608 /****************************************************************************
609  Send one of the messages for the broadcast.
610 ****************************************************************************/
611
612 static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void *state)
613 {
614         struct connections_data crec;
615         struct msg_all *msg_all = (struct msg_all *)state;
616         NTSTATUS status;
617
618         if (dbuf.dsize != sizeof(crec))
619                 return 0;
620
621         memcpy(&crec, dbuf.dptr, sizeof(crec));
622
623         if (crec.cnum != -1)
624                 return 0;
625
626         /* Don't send if the receiver hasn't registered an interest. */
627
628         if(!(crec.bcast_msg_flags & msg_all->msg_flag))
629                 return 0;
630
631         /* If the msg send fails because the pid was not found (i.e. smbd died), 
632          * the msg has already been deleted from the messages.tdb.*/
633
634         status = message_send_pid(crec.pid, msg_all->msg_type,
635                                   msg_all->buf, msg_all->len,
636                                   msg_all->duplicates);
637
638         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
639                 
640                 /* If the pid was not found delete the entry from connections.tdb */
641
642                 DEBUG(2,("pid %s doesn't exist - deleting connections %d [%s]\n",
643                          procid_str_static(&crec.pid), crec.cnum, crec.servicename));
644                 tdb_delete(the_tdb, kbuf);
645         }
646         msg_all->n_sent++;
647         return 0;
648 }
649
650 /**
651  * Send a message to all smbd processes.
652  *
653  * It isn't very efficient, but should be OK for the sorts of
654  * applications that use it. When we need efficient broadcast we can add
655  * it.
656  *
657  * @param n_sent Set to the number of messages sent.  This should be
658  * equal to the number of processes, but be careful for races.
659  *
660  * @retval True for success.
661  **/
662 BOOL message_send_all(struct messaging_context *msg_ctx,
663                       int msg_type,
664                       const void *buf, size_t len,
665                       BOOL duplicates_allowed,
666                       int *n_sent)
667 {
668         struct msg_all msg_all;
669
670         msg_all.msg_type = msg_type;
671         if (msg_type < 1000)
672                 msg_all.msg_flag = FLAG_MSG_GENERAL;
673         else if (msg_type > 1000 && msg_type < 2000)
674                 msg_all.msg_flag = FLAG_MSG_NMBD;
675         else if (msg_type > 2000 && msg_type < 2100)
676                 msg_all.msg_flag = FLAG_MSG_PRINT_NOTIFY;
677         else if (msg_type > 2100 && msg_type < 3000)
678                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
679         else if (msg_type > 3000 && msg_type < 4000)
680                 msg_all.msg_flag = FLAG_MSG_SMBD;
681         else
682                 return False;
683
684         msg_all.buf = buf;
685         msg_all.len = len;
686         msg_all.duplicates = duplicates_allowed;
687         msg_all.n_sent = 0;
688
689         connections_traverse(traverse_fn, &msg_all);
690         if (n_sent)
691                 *n_sent = msg_all.n_sent;
692         return True;
693 }
694
695 /*
696  * Block and unblock receiving of messages. Allows removal of race conditions
697  * when doing a fork and changing message disposition.
698  */
699
700 void message_block(void)
701 {
702         BlockSignals(True, SIGUSR1);
703 }
704
705 void message_unblock(void)
706 {
707         BlockSignals(False, SIGUSR1);
708 }
709
710 /*
711  * Samba4 API wrapper around the Samba3 implementation. Yes, I know, we could
712  * import the whole Samba4 thing, but I want notify.c from Samba4 in first.
713  */
714
715 struct messaging_callback {
716         struct messaging_callback *prev, *next;
717         uint32 msg_type;
718         void (*fn)(struct messaging_context *msg, void *private_data, 
719                    uint32_t msg_type, 
720                    struct server_id server_id, DATA_BLOB *data);
721         void *private_data;
722 };
723
724 struct messaging_context {
725         struct server_id id;
726         struct messaging_callback *callbacks;
727 };
728
729 static int messaging_context_destructor(struct messaging_context *ctx)
730 {
731         struct messaging_callback *cb;
732
733         for (cb = ctx->callbacks; cb; cb = cb->next) {
734                 /*
735                  * We unconditionally remove all instances of our callback
736                  * from the tdb basis.
737                  */
738                 message_deregister(cb->msg_type);
739         }
740         return 0;
741 }
742
743 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
744                                          struct server_id server_id, 
745                                          struct event_context *ev)
746 {
747         struct messaging_context *ctx;
748
749         if (!(ctx = TALLOC_ZERO_P(mem_ctx, struct messaging_context))) {
750                 return NULL;
751         }
752
753         ctx->id = server_id;
754         talloc_set_destructor(ctx, messaging_context_destructor);
755
756         if (!message_init(ctx)) {
757                 DEBUG(0, ("message_init failed: %s\n", strerror(errno)));
758                 TALLOC_FREE(ctx);
759         }
760
761         return ctx;
762 }
763
764 static void messaging_callback(int msg_type, struct server_id pid,
765                                void *buf, size_t len, void *private_data)
766 {
767         struct messaging_context *ctx = talloc_get_type_abort(
768                 private_data, struct messaging_context);
769         struct messaging_callback *cb, *next;
770
771         for (cb = ctx->callbacks; cb; cb = next) {
772                 /*
773                  * Allow a callback to remove itself
774                  */
775                 next = cb->next;
776
777                 if (msg_type == cb->msg_type) {
778                         DATA_BLOB blob;
779
780                         blob.data = (uint8 *)buf;
781                         blob.length = len;
782
783                         cb->fn(ctx, cb->private_data, msg_type, pid, &blob);
784                 }
785         }
786 }
787
788 /*
789  * Register a dispatch function for a particular message type. Allow multiple
790  * registrants
791 */
792 NTSTATUS messaging_register(struct messaging_context *ctx, void *private_data,
793                             uint32_t msg_type,
794                             void (*fn)(struct messaging_context *msg,
795                                        void *private_data, 
796                                        uint32_t msg_type, 
797                                        struct server_id server_id,
798                                        DATA_BLOB *data))
799 {
800         struct messaging_callback *cb;
801
802         if (!(cb = talloc(ctx, struct messaging_callback))) {
803                 return NT_STATUS_NO_MEMORY;
804         }
805
806         cb->msg_type = msg_type;
807         cb->fn = fn;
808         cb->private_data = private_data;
809
810         DLIST_ADD(ctx->callbacks, cb);
811         message_register(msg_type, messaging_callback, ctx);
812         return NT_STATUS_OK;
813 }
814
815 /*
816   De-register the function for a particular message type.
817 */
818 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
819                           void *private_data)
820 {
821         struct messaging_callback *cb, *next;
822
823         for (cb = ctx->callbacks; cb; cb = next) {
824                 next = cb->next;
825                 if ((cb->msg_type == msg_type)
826                     && (cb->private_data == private_data)) {
827                         DLIST_REMOVE(ctx->callbacks, cb);
828                         TALLOC_FREE(cb);
829                 }
830         }
831 }
832
833 /*
834   Send a message to a particular server
835 */
836 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
837                         struct server_id server, 
838                         uint32_t msg_type, const DATA_BLOB *data)
839 {
840         return message_send_pid_internal(server, msg_type, data->data,
841                                          data->length, True, 0);
842 }
843
844 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
845                             struct server_id server, uint32_t msg_type,
846                             const uint8 *buf, size_t len)
847 {
848         DATA_BLOB blob = data_blob_const(buf, len);
849         return messaging_send(msg_ctx, server, msg_type, &blob);
850 }
851
852 NTSTATUS messaging_send_buf_with_timeout(struct messaging_context *msg_ctx,
853                                          struct server_id server,
854                                          uint32_t msg_type,
855                                          const uint8 *buf, size_t len,
856                                          int timeout)
857 {
858         return message_send_pid_internal(server, msg_type, buf, len,
859                                          True, timeout);
860 }
861
862 /** @} **/