r22009: change TDB_DATA from char * to unsigned char *
[vlendec/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 2 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27   
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49
50 /* the locking database handle */
51 static TDB_CONTEXT *tdb;
52 static int received_signal;
53
54 /* change the message version with any incompatible changes in the protocol */
55 #define MESSAGE_VERSION 1
56
57 struct message_rec {
58         int msg_version;
59         int msg_type;
60         struct process_id dest;
61         struct process_id src;
62         size_t len;
63 };
64
65 /* we have a linked list of dispatch handlers */
66 static struct dispatch_fns {
67         struct dispatch_fns *next, *prev;
68         int msg_type;
69         void (*fn)(int msg_type, struct process_id pid, void *buf, size_t len,
70                    void *private_data);
71         void *private_data;
72 } *dispatch_fns;
73
74 /****************************************************************************
75  Free global objects.
76 ****************************************************************************/
77
78 void gfree_messages(void)
79 {
80         struct dispatch_fns *dfn, *next;
81
82         /* delete the dispatch_fns list */
83         dfn = dispatch_fns;
84         while( dfn ) {
85                 next = dfn->next;
86                 DLIST_REMOVE(dispatch_fns, dfn);
87                 SAFE_FREE(dfn);
88                 dfn = next;
89         }
90 }
91
92 /****************************************************************************
93  Notifications come in as signals.
94 ****************************************************************************/
95
96 static void sig_usr1(void)
97 {
98         received_signal = 1;
99         sys_select_signal(SIGUSR1);
100 }
101
102 /****************************************************************************
103  A useful function for testing the message system.
104 ****************************************************************************/
105
106 static void ping_message(int msg_type, struct process_id src,
107                          void *buf, size_t len, void *private_data)
108 {
109         const char *msg = buf ? (const char *)buf : "none";
110
111         DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
112                  procid_str_static(&src), msg));
113         message_send_pid(src, MSG_PONG, buf, len, True);
114 }
115
116 /****************************************************************************
117  Initialise the messaging functions. 
118 ****************************************************************************/
119
120 BOOL message_init(void)
121 {
122         sec_init();
123
124         if (tdb)
125                 return True;
126
127         tdb = tdb_open_log(lock_path("messages.tdb"), 
128                        0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, 
129                        O_RDWR|O_CREAT,0600);
130
131         if (!tdb) {
132                 DEBUG(0,("ERROR: Failed to initialise messages database\n"));
133                 return False;
134         }
135
136         /* Activate the per-hashchain freelist */
137         tdb_set_max_dead(tdb, 5);
138
139         CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
140
141         message_register(MSG_PING, ping_message, NULL);
142
143         /* Register some debugging related messages */
144
145         register_msg_pool_usage();
146         register_dmalloc_msgs();
147
148         return True;
149 }
150
151 /*******************************************************************
152  Form a static tdb key from a pid.
153 ******************************************************************/
154
155 static TDB_DATA message_key_pid(struct process_id pid)
156 {
157         static char key[20];
158         TDB_DATA kbuf;
159
160         slprintf(key, sizeof(key)-1, "PID/%s", procid_str_static(&pid));
161         
162         kbuf.dptr = (uint8 *)key;
163         kbuf.dsize = strlen(key)+1;
164         return kbuf;
165 }
166
167 /****************************************************************************
168  Notify a process that it has a message. If the process doesn't exist 
169  then delete its record in the database.
170 ****************************************************************************/
171
172 static NTSTATUS message_notify(struct process_id procid)
173 {
174         pid_t pid = procid.pid;
175         int ret;
176         uid_t euid = geteuid();
177
178         /*
179          * Doing kill with a non-positive pid causes messages to be
180          * sent to places we don't want.
181          */
182
183         SMB_ASSERT(pid > 0);
184
185         if (euid != 0) {
186                 become_root_uid_only();
187         }
188
189         ret = kill(pid, SIGUSR1);
190
191         if (euid != 0) {
192                 unbecome_root_uid_only();
193         }
194
195         if (ret == -1) {
196                 if (errno == ESRCH) {
197                         DEBUG(2,("pid %d doesn't exist - deleting messages record\n",
198                                  (int)pid));
199                         tdb_delete(tdb, message_key_pid(procid));
200
201                         /*
202                          * INVALID_HANDLE is the closest I can think of -- vl
203                          */
204                         return NT_STATUS_INVALID_HANDLE;
205                 }
206
207                 DEBUG(2,("message to process %d failed - %s\n", (int)pid,
208                          strerror(errno)));
209
210                 /*
211                  * No call to map_nt_error_from_unix -- don't want to link in
212                  * errormap.o into lots of utils.
213                  */
214
215                 if (errno == EINVAL) return NT_STATUS_INVALID_PARAMETER;
216                 if (errno == EPERM)  return NT_STATUS_ACCESS_DENIED;
217                 return NT_STATUS_UNSUCCESSFUL;
218         }
219
220         return NT_STATUS_OK;
221 }
222
223 /****************************************************************************
224  Send a message to a particular pid.
225 ****************************************************************************/
226
227 static NTSTATUS message_send_pid_internal(struct process_id pid, int msg_type,
228                                           const void *buf, size_t len,
229                                           BOOL duplicates_allowed,
230                                           unsigned int timeout)
231 {
232         TDB_DATA kbuf;
233         TDB_DATA dbuf;
234         TDB_DATA old_dbuf;
235         struct message_rec rec;
236         uint8 *ptr;
237         struct message_rec prec;
238
239         /* NULL pointer means implicit length zero. */
240         if (!buf) {
241                 SMB_ASSERT(len == 0);
242         }
243
244         /*
245          * Doing kill with a non-positive pid causes messages to be
246          * sent to places we don't want.
247          */
248
249         SMB_ASSERT(procid_to_pid(&pid) > 0);
250
251         rec.msg_version = MESSAGE_VERSION;
252         rec.msg_type = msg_type;
253         rec.dest = pid;
254         rec.src = procid_self();
255         rec.len = buf ? len : 0;
256
257         kbuf = message_key_pid(pid);
258
259         dbuf.dptr = (uint8 *)SMB_MALLOC(len + sizeof(rec));
260         if (!dbuf.dptr) {
261                 return NT_STATUS_NO_MEMORY;
262         }
263
264         memcpy(dbuf.dptr, &rec, sizeof(rec));
265         if (len > 0 && buf)
266                 memcpy((void *)((char*)dbuf.dptr+sizeof(rec)), buf, len);
267
268         dbuf.dsize = len + sizeof(rec);
269
270         if (duplicates_allowed) {
271
272                 /* If duplicates are allowed we can just append the message and return. */
273
274                 /* lock the record for the destination */
275                 if (timeout) {
276                         if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) {
277                                 DEBUG(0,("message_send_pid_internal: failed to get "
278                                          "chainlock with timeout %ul.\n", timeout));
279                                 return NT_STATUS_IO_TIMEOUT;
280                         }
281                 } else {
282                         if (tdb_chainlock(tdb, kbuf) == -1) {
283                                 DEBUG(0,("message_send_pid_internal: failed to get "
284                                          "chainlock.\n"));
285                                 return NT_STATUS_LOCK_NOT_GRANTED;
286                         }
287                 }       
288                 tdb_append(tdb, kbuf, dbuf);
289                 tdb_chainunlock(tdb, kbuf);
290
291                 SAFE_FREE(dbuf.dptr);
292                 errno = 0;                    /* paranoia */
293                 return message_notify(pid);
294         }
295
296         /* lock the record for the destination */
297         if (timeout) {
298                 if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) {
299                         DEBUG(0,("message_send_pid_internal: failed to get chainlock "
300                                  "with timeout %ul.\n", timeout));
301                         return NT_STATUS_IO_TIMEOUT;
302                 }
303         } else {
304                 if (tdb_chainlock(tdb, kbuf) == -1) {
305                         DEBUG(0,("message_send_pid_internal: failed to get "
306                                  "chainlock.\n"));
307                         return NT_STATUS_LOCK_NOT_GRANTED;
308                 }
309         }       
310
311         old_dbuf = tdb_fetch(tdb, kbuf);
312
313         if (!old_dbuf.dptr) {
314                 /* its a new record */
315
316                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
317                 tdb_chainunlock(tdb, kbuf);
318
319                 SAFE_FREE(dbuf.dptr);
320                 errno = 0;                    /* paranoia */
321                 return message_notify(pid);
322         }
323
324         /* Not a new record. Check for duplicates. */
325
326         for(ptr = old_dbuf.dptr; ptr < old_dbuf.dptr + old_dbuf.dsize; ) {
327                 /*
328                  * First check if the message header matches, then, if it's a non-zero
329                  * sized message, check if the data matches. If so it's a duplicate and
330                  * we can discard it. JRA.
331                  */
332
333                 if (!memcmp(ptr, &rec, sizeof(rec))) {
334                         if (!len || (len && !memcmp( ptr + sizeof(rec), buf, len))) {
335                                 tdb_chainunlock(tdb, kbuf);
336                                 DEBUG(10,("message_send_pid_internal: discarding "
337                                           "duplicate message.\n"));
338                                 SAFE_FREE(dbuf.dptr);
339                                 SAFE_FREE(old_dbuf.dptr);
340                                 return NT_STATUS_OK;
341                         }
342                 }
343                 memcpy(&prec, ptr, sizeof(prec));
344                 ptr += sizeof(rec) + prec.len;
345         }
346
347         /* we're adding to an existing entry */
348
349         tdb_append(tdb, kbuf, dbuf);
350         tdb_chainunlock(tdb, kbuf);
351
352         SAFE_FREE(old_dbuf.dptr);
353         SAFE_FREE(dbuf.dptr);
354
355         errno = 0;                    /* paranoia */
356         return message_notify(pid);
357 }
358
359 /****************************************************************************
360  Send a message to a particular pid - no timeout.
361 ****************************************************************************/
362
363 NTSTATUS message_send_pid(struct process_id pid, int msg_type, const void *buf,
364                           size_t len, BOOL duplicates_allowed)
365 {
366         return message_send_pid_internal(pid, msg_type, buf, len,
367                                          duplicates_allowed, 0);
368 }
369
370 /****************************************************************************
371  Send a message to a particular pid, with timeout in seconds.
372 ****************************************************************************/
373
374 NTSTATUS message_send_pid_with_timeout(struct process_id pid, int msg_type,
375                                        const void *buf, size_t len,
376                                        BOOL duplicates_allowed, unsigned int timeout)
377 {
378         return message_send_pid_internal(pid, msg_type, buf, len, duplicates_allowed,
379                                          timeout);
380 }
381
382 /****************************************************************************
383  Count the messages pending for a particular pid. Expensive....
384 ****************************************************************************/
385
386 unsigned int messages_pending_for_pid(struct process_id pid)
387 {
388         TDB_DATA kbuf;
389         TDB_DATA dbuf;
390         uint8 *buf;
391         unsigned int message_count = 0;
392
393         kbuf = message_key_pid(pid);
394
395         dbuf = tdb_fetch(tdb, kbuf);
396         if (dbuf.dptr == NULL || dbuf.dsize == 0) {
397                 SAFE_FREE(dbuf.dptr);
398                 return 0;
399         }
400
401         for (buf = dbuf.dptr; dbuf.dsize > sizeof(struct message_rec);) {
402                 struct message_rec rec;
403                 memcpy(&rec, buf, sizeof(rec));
404                 buf += (sizeof(rec) + rec.len);
405                 dbuf.dsize -= (sizeof(rec) + rec.len);
406                 message_count++;
407         }
408
409         SAFE_FREE(dbuf.dptr);
410         return message_count;
411 }
412
413 /****************************************************************************
414  Retrieve all messages for the current process.
415 ****************************************************************************/
416
417 static BOOL retrieve_all_messages(char **msgs_buf, size_t *total_len)
418 {
419         TDB_DATA kbuf;
420         TDB_DATA dbuf;
421         TDB_DATA null_dbuf;
422
423         ZERO_STRUCT(null_dbuf);
424
425         *msgs_buf = NULL;
426         *total_len = 0;
427
428         kbuf = message_key_pid(pid_to_procid(sys_getpid()));
429
430         if (tdb_chainlock(tdb, kbuf) == -1)
431                 return False;
432
433         dbuf = tdb_fetch(tdb, kbuf);
434         /*
435          * Replace with an empty record to keep the allocated
436          * space in the tdb.
437          */
438         tdb_store(tdb, kbuf, null_dbuf, TDB_REPLACE);
439         tdb_chainunlock(tdb, kbuf);
440
441         if (dbuf.dptr == NULL || dbuf.dsize == 0) {
442                 SAFE_FREE(dbuf.dptr);
443                 return False;
444         }
445
446         *msgs_buf = (char *)dbuf.dptr;
447         *total_len = dbuf.dsize;
448
449         return True;
450 }
451
452 /****************************************************************************
453  Parse out the next message for the current process.
454 ****************************************************************************/
455
456 static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type,
457                          struct process_id *src, char **buf, size_t *len)
458 {
459         struct message_rec rec;
460         char *ret_buf = *buf;
461
462         *buf = NULL;
463         *len = 0;
464
465         if (total_len - (ret_buf - msgs_buf) < sizeof(rec))
466                 return False;
467
468         memcpy(&rec, ret_buf, sizeof(rec));
469         ret_buf += sizeof(rec);
470
471         if (rec.msg_version != MESSAGE_VERSION) {
472                 DEBUG(0,("message version %d received (expected %d)\n", rec.msg_version, MESSAGE_VERSION));
473                 return False;
474         }
475
476         if (rec.len > 0) {
477                 if (total_len - (ret_buf - msgs_buf) < rec.len)
478                         return False;
479         }
480
481         *len = rec.len;
482         *msg_type = rec.msg_type;
483         *src = rec.src;
484         *buf = ret_buf;
485
486         return True;
487 }
488
489 /****************************************************************************
490  Receive and dispatch any messages pending for this process.
491  JRA changed Dec 13 2006. Only one message handler now permitted per type.
492  *NOTE*: Dispatch functions must be able to cope with incoming
493  messages on an *odd* byte boundary.
494 ****************************************************************************/
495
496 void message_dispatch(void)
497 {
498         int msg_type;
499         struct process_id src;
500         char *buf;
501         char *msgs_buf;
502         size_t len, total_len;
503         int n_handled;
504
505         if (!received_signal)
506                 return;
507
508         DEBUG(10,("message_dispatch: received_signal = %d\n", received_signal));
509
510         received_signal = 0;
511
512         if (!retrieve_all_messages(&msgs_buf, &total_len))
513                 return;
514
515         for (buf = msgs_buf; message_recv(msgs_buf, total_len, &msg_type, &src, &buf, &len); buf += len) {
516                 struct dispatch_fns *dfn;
517
518                 DEBUG(10,("message_dispatch: received msg_type=%d "
519                           "src_pid=%u\n", msg_type,
520                           (unsigned int) procid_to_pid(&src)));
521
522                 n_handled = 0;
523                 for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
524                         if (dfn->msg_type == msg_type) {
525                                 DEBUG(10,("message_dispatch: processing message of type %d.\n", msg_type));
526                                 dfn->fn(msg_type, src,
527                                         len ? (void *)buf : NULL, len,
528                                         dfn->private_data);
529                                 n_handled++;
530                                 break;
531                         }
532                 }
533                 if (!n_handled) {
534                         DEBUG(5,("message_dispatch: warning: no handler registed for "
535                                  "msg_type %d in pid %u\n",
536                                  msg_type, (unsigned int)sys_getpid()));
537                 }
538         }
539         SAFE_FREE(msgs_buf);
540 }
541
542 /****************************************************************************
543  Register/replace a dispatch function for a particular message type.
544  JRA changed Dec 13 2006. Only one message handler now permitted per type.
545  *NOTE*: Dispatch functions must be able to cope with incoming
546  messages on an *odd* byte boundary.
547 ****************************************************************************/
548
549 void message_register(int msg_type, 
550                       void (*fn)(int msg_type, struct process_id pid,
551                                  void *buf, size_t len,
552                                  void *private_data),
553                       void *private_data)
554 {
555         struct dispatch_fns *dfn;
556
557         for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
558                 if (dfn->msg_type == msg_type) {
559                         dfn->fn = fn;
560                         return;
561                 }
562         }
563
564         dfn = SMB_MALLOC_P(struct dispatch_fns);
565
566         if (dfn != NULL) {
567
568                 ZERO_STRUCTPN(dfn);
569
570                 dfn->msg_type = msg_type;
571                 dfn->fn = fn;
572                 dfn->private_data = private_data;
573
574                 DLIST_ADD(dispatch_fns, dfn);
575         }
576         else {
577         
578                 DEBUG(0,("message_register: Not enough memory. malloc failed!\n"));
579         }
580 }
581
582 /****************************************************************************
583  De-register the function for a particular message type.
584 ****************************************************************************/
585
586 void message_deregister(int msg_type)
587 {
588         struct dispatch_fns *dfn, *next;
589
590         for (dfn = dispatch_fns; dfn; dfn = next) {
591                 next = dfn->next;
592                 if (dfn->msg_type == msg_type) {
593                         DLIST_REMOVE(dispatch_fns, dfn);
594                         SAFE_FREE(dfn);
595                         return;
596                 }
597         }       
598 }
599
600 struct msg_all {
601         int msg_type;
602         uint32 msg_flag;
603         const void *buf;
604         size_t len;
605         BOOL duplicates;
606         int n_sent;
607 };
608
609 /****************************************************************************
610  Send one of the messages for the broadcast.
611 ****************************************************************************/
612
613 static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void *state)
614 {
615         struct connections_data crec;
616         struct msg_all *msg_all = (struct msg_all *)state;
617         NTSTATUS status;
618
619         if (dbuf.dsize != sizeof(crec))
620                 return 0;
621
622         memcpy(&crec, dbuf.dptr, sizeof(crec));
623
624         if (crec.cnum != -1)
625                 return 0;
626
627         /* Don't send if the receiver hasn't registered an interest. */
628
629         if(!(crec.bcast_msg_flags & msg_all->msg_flag))
630                 return 0;
631
632         /* If the msg send fails because the pid was not found (i.e. smbd died), 
633          * the msg has already been deleted from the messages.tdb.*/
634
635         status = message_send_pid(crec.pid, msg_all->msg_type,
636                                   msg_all->buf, msg_all->len,
637                                   msg_all->duplicates);
638
639         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
640                 
641                 /* If the pid was not found delete the entry from connections.tdb */
642
643                 DEBUG(2,("pid %s doesn't exist - deleting connections %d [%s]\n",
644                          procid_str_static(&crec.pid), crec.cnum, crec.name));
645                 tdb_delete(the_tdb, kbuf);
646         }
647         msg_all->n_sent++;
648         return 0;
649 }
650
651 /**
652  * Send a message to all smbd processes.
653  *
654  * It isn't very efficient, but should be OK for the sorts of
655  * applications that use it. When we need efficient broadcast we can add
656  * it.
657  *
658  * @param n_sent Set to the number of messages sent.  This should be
659  * equal to the number of processes, but be careful for races.
660  *
661  * @retval True for success.
662  **/
663 BOOL message_send_all(TDB_CONTEXT *conn_tdb, int msg_type,
664                       const void *buf, size_t len,
665                       BOOL duplicates_allowed,
666                       int *n_sent)
667 {
668         struct msg_all msg_all;
669
670         msg_all.msg_type = msg_type;
671         if (msg_type < 1000)
672                 msg_all.msg_flag = FLAG_MSG_GENERAL;
673         else if (msg_type > 1000 && msg_type < 2000)
674                 msg_all.msg_flag = FLAG_MSG_NMBD;
675         else if (msg_type > 2000 && msg_type < 2100)
676                 msg_all.msg_flag = FLAG_MSG_PRINT_NOTIFY;
677         else if (msg_type > 2100 && msg_type < 3000)
678                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
679         else if (msg_type > 3000 && msg_type < 4000)
680                 msg_all.msg_flag = FLAG_MSG_SMBD;
681         else
682                 return False;
683
684         msg_all.buf = buf;
685         msg_all.len = len;
686         msg_all.duplicates = duplicates_allowed;
687         msg_all.n_sent = 0;
688
689         tdb_traverse(conn_tdb, traverse_fn, &msg_all);
690         if (n_sent)
691                 *n_sent = msg_all.n_sent;
692         return True;
693 }
694
695 /*
696  * Block and unblock receiving of messages. Allows removal of race conditions
697  * when doing a fork and changing message disposition.
698  */
699
700 void message_block(void)
701 {
702         BlockSignals(True, SIGUSR1);
703 }
704
705 void message_unblock(void)
706 {
707         BlockSignals(False, SIGUSR1);
708 }
709
710 /*
711  * Samba4 API wrapper around the Samba3 implementation. Yes, I know, we could
712  * import the whole Samba4 thing, but I want notify.c from Samba4 in first.
713  */
714
715 struct messaging_callback {
716         struct messaging_callback *prev, *next;
717         uint32 msg_type;
718         void (*fn)(struct messaging_context *msg, void *private_data, 
719                    uint32_t msg_type, 
720                    struct server_id server_id, DATA_BLOB *data);
721         void *private_data;
722 };
723
724 struct messaging_context {
725         struct server_id id;
726         struct messaging_callback *callbacks;
727 };
728
729 static int messaging_context_destructor(struct messaging_context *ctx)
730 {
731         struct messaging_callback *cb;
732
733         for (cb = ctx->callbacks; cb; cb = cb->next) {
734                 /*
735                  * We unconditionally remove all instances of our callback
736                  * from the tdb basis.
737                  */
738                 message_deregister(cb->msg_type);
739         }
740         return 0;
741 }
742
743 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
744                                          struct server_id server_id, 
745                                          struct event_context *ev)
746 {
747         struct messaging_context *ctx;
748
749         if (!(ctx = TALLOC_ZERO_P(mem_ctx, struct messaging_context))) {
750                 return NULL;
751         }
752
753         ctx->id = server_id;
754         talloc_set_destructor(ctx, messaging_context_destructor);
755         return ctx;
756 }
757
758 static void messaging_callback(int msg_type, struct process_id pid,
759                                void *buf, size_t len, void *private_data)
760 {
761         struct messaging_context *ctx = talloc_get_type_abort(
762                 private_data, struct messaging_context);
763         struct messaging_callback *cb, *next;
764
765         for (cb = ctx->callbacks; cb; cb = next) {
766                 /*
767                  * Allow a callback to remove itself
768                  */
769                 next = cb->next;
770
771                 if (msg_type == cb->msg_type) {
772                         DATA_BLOB blob;
773                         struct server_id id;
774
775                         blob.data = (uint8 *)buf;
776                         blob.length = len;
777                         id.id = pid;
778
779                         cb->fn(ctx, cb->private_data, msg_type, id, &blob);
780                 }
781         }
782 }
783
784 /*
785  * Register a dispatch function for a particular message type. Allow multiple
786  * registrants
787 */
788 NTSTATUS messaging_register(struct messaging_context *ctx, void *private_data,
789                             uint32_t msg_type,
790                             void (*fn)(struct messaging_context *msg,
791                                        void *private_data, 
792                                        uint32_t msg_type, 
793                                        struct server_id server_id,
794                                        DATA_BLOB *data))
795 {
796         struct messaging_callback *cb;
797
798         if (!(cb = talloc(ctx, struct messaging_callback))) {
799                 return NT_STATUS_NO_MEMORY;
800         }
801
802         cb->msg_type = msg_type;
803         cb->fn = fn;
804         cb->private_data = private_data;
805
806         DLIST_ADD(ctx->callbacks, cb);
807         message_register(msg_type, messaging_callback, ctx);
808         return NT_STATUS_OK;
809 }
810
811 /*
812   De-register the function for a particular message type.
813 */
814 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
815                           void *private_data)
816 {
817         struct messaging_callback *cb, *next;
818
819         for (cb = ctx->callbacks; cb; cb = next) {
820                 next = cb->next;
821                 if ((cb->msg_type == msg_type)
822                     && (cb->private_data == private_data)) {
823                         DLIST_REMOVE(ctx->callbacks, cb);
824                         TALLOC_FREE(cb);
825                 }
826         }
827 }
828
829 /*
830   Send a message to a particular server
831 */
832 NTSTATUS messaging_send(struct messaging_context *msg,
833                         struct server_id server, 
834                         uint32_t msg_type, DATA_BLOB *data)
835 {
836         return message_send_pid_internal(server.id, msg_type, data->data,
837                                          data->length, True, 0);
838 }
839
840 /** @} **/