94caca69b70e42efa7c2474f9a097f22a7da3a68
[sfrench/samba-autobuild/.git] / source / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 2 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27   
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49
50 /* the locking database handle */
51 static TDB_CONTEXT *tdb;
52 static int received_signal;
53
54 /* change the message version with any incompatible changes in the protocol */
55 #define MESSAGE_VERSION 1
56
57 struct message_rec {
58         int msg_version;
59         int msg_type;
60         struct process_id dest;
61         struct process_id src;
62         size_t len;
63 };
64
65 /* we have a linked list of dispatch handlers */
66 static struct dispatch_fns {
67         struct dispatch_fns *next, *prev;
68         int msg_type;
69         void (*fn)(int msg_type, struct process_id pid, void *buf, size_t len,
70                    void *private_data);
71         void *private_data;
72 } *dispatch_fns;
73
74 /****************************************************************************
75  Free global objects.
76 ****************************************************************************/
77
78 void gfree_messages(void)
79 {
80         struct dispatch_fns *dfn, *next;
81
82         /* delete the dispatch_fns list */
83         dfn = dispatch_fns;
84         while( dfn ) {
85                 next = dfn->next;
86                 DLIST_REMOVE(dispatch_fns, dfn);
87                 SAFE_FREE(dfn);
88                 dfn = next;
89         }
90 }
91
92 /****************************************************************************
93  Notifications come in as signals.
94 ****************************************************************************/
95
96 static void sig_usr1(void)
97 {
98         received_signal = 1;
99         sys_select_signal(SIGUSR1);
100 }
101
102 /****************************************************************************
103  A useful function for testing the message system.
104 ****************************************************************************/
105
106 static void ping_message(int msg_type, struct process_id src,
107                          void *buf, size_t len, void *private_data)
108 {
109         const char *msg = buf ? (const char *)buf : "none";
110
111         DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
112                  procid_str_static(&src), msg));
113         message_send_pid(src, MSG_PONG, buf, len, True);
114 }
115
116 /****************************************************************************
117  Initialise the messaging functions. 
118 ****************************************************************************/
119
120 BOOL message_init(void)
121 {
122         sec_init();
123
124         if (tdb)
125                 return True;
126
127         tdb = tdb_open_log(lock_path("messages.tdb"), 
128                        0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, 
129                        O_RDWR|O_CREAT,0600);
130
131         if (!tdb) {
132                 DEBUG(0,("ERROR: Failed to initialise messages database\n"));
133                 return False;
134         }
135
136         /* Activate the per-hashchain freelist */
137         tdb_set_max_dead(tdb, 5);
138
139         CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
140
141         message_register(MSG_PING, ping_message, NULL);
142
143         /* Register some debugging related messages */
144
145         register_msg_pool_usage();
146         register_dmalloc_msgs();
147
148         return True;
149 }
150
151 /*******************************************************************
152  Form a static tdb key from a pid.
153 ******************************************************************/
154
155 static TDB_DATA message_key_pid(struct process_id pid)
156 {
157         static char key[20];
158         TDB_DATA kbuf;
159
160         slprintf(key, sizeof(key)-1, "PID/%s", procid_str_static(&pid));
161         
162         kbuf.dptr = (uint8 *)key;
163         kbuf.dsize = strlen(key)+1;
164         return kbuf;
165 }
166
167 /****************************************************************************
168  Notify a process that it has a message. If the process doesn't exist 
169  then delete its record in the database.
170 ****************************************************************************/
171
172 static NTSTATUS message_notify(struct process_id procid)
173 {
174         pid_t pid = procid.pid;
175         int ret;
176         uid_t euid = geteuid();
177
178         /*
179          * Doing kill with a non-positive pid causes messages to be
180          * sent to places we don't want.
181          */
182
183         SMB_ASSERT(pid > 0);
184
185         if (euid != 0) {
186                 /* If we're not root become so to send the message. */
187                 save_re_uid();
188                 set_effective_uid(0);
189         }
190
191         ret = kill(pid, SIGUSR1);
192
193         if (euid != 0) {
194                 /* Go back to who we were. */
195                 int saved_errno = errno;
196                 restore_re_uid_fromroot();
197                 errno = saved_errno;
198         }
199
200         if (ret == -1) {
201                 if (errno == ESRCH) {
202                         DEBUG(2,("pid %d doesn't exist - deleting messages record\n",
203                                  (int)pid));
204                         tdb_delete(tdb, message_key_pid(procid));
205
206                         /*
207                          * INVALID_HANDLE is the closest I can think of -- vl
208                          */
209                         return NT_STATUS_INVALID_HANDLE;
210                 }
211
212                 DEBUG(2,("message to process %d failed - %s\n", (int)pid,
213                          strerror(errno)));
214
215                 /*
216                  * No call to map_nt_error_from_unix -- don't want to link in
217                  * errormap.o into lots of utils.
218                  */
219
220                 if (errno == EINVAL) return NT_STATUS_INVALID_PARAMETER;
221                 if (errno == EPERM)  return NT_STATUS_ACCESS_DENIED;
222                 return NT_STATUS_UNSUCCESSFUL;
223         }
224
225         return NT_STATUS_OK;
226 }
227
228 /****************************************************************************
229  Send a message to a particular pid.
230 ****************************************************************************/
231
232 static NTSTATUS message_send_pid_internal(struct process_id pid, int msg_type,
233                                           const void *buf, size_t len,
234                                           BOOL duplicates_allowed,
235                                           unsigned int timeout)
236 {
237         TDB_DATA kbuf;
238         TDB_DATA dbuf;
239         TDB_DATA old_dbuf;
240         struct message_rec rec;
241         uint8 *ptr;
242         struct message_rec prec;
243
244         /* NULL pointer means implicit length zero. */
245         if (!buf) {
246                 SMB_ASSERT(len == 0);
247         }
248
249         /*
250          * Doing kill with a non-positive pid causes messages to be
251          * sent to places we don't want.
252          */
253
254         SMB_ASSERT(procid_to_pid(&pid) > 0);
255
256         rec.msg_version = MESSAGE_VERSION;
257         rec.msg_type = msg_type;
258         rec.dest = pid;
259         rec.src = procid_self();
260         rec.len = buf ? len : 0;
261
262         kbuf = message_key_pid(pid);
263
264         dbuf.dptr = (uint8 *)SMB_MALLOC(len + sizeof(rec));
265         if (!dbuf.dptr) {
266                 return NT_STATUS_NO_MEMORY;
267         }
268
269         memcpy(dbuf.dptr, &rec, sizeof(rec));
270         if (len > 0 && buf)
271                 memcpy((void *)((char*)dbuf.dptr+sizeof(rec)), buf, len);
272
273         dbuf.dsize = len + sizeof(rec);
274
275         if (duplicates_allowed) {
276
277                 /* If duplicates are allowed we can just append the message and return. */
278
279                 /* lock the record for the destination */
280                 if (timeout) {
281                         if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) {
282                                 DEBUG(0,("message_send_pid_internal: failed to get "
283                                          "chainlock with timeout %ul.\n", timeout));
284                                 return NT_STATUS_IO_TIMEOUT;
285                         }
286                 } else {
287                         if (tdb_chainlock(tdb, kbuf) == -1) {
288                                 DEBUG(0,("message_send_pid_internal: failed to get "
289                                          "chainlock.\n"));
290                                 return NT_STATUS_LOCK_NOT_GRANTED;
291                         }
292                 }       
293                 tdb_append(tdb, kbuf, dbuf);
294                 tdb_chainunlock(tdb, kbuf);
295
296                 SAFE_FREE(dbuf.dptr);
297                 errno = 0;                    /* paranoia */
298                 return message_notify(pid);
299         }
300
301         /* lock the record for the destination */
302         if (timeout) {
303                 if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) {
304                         DEBUG(0,("message_send_pid_internal: failed to get chainlock "
305                                  "with timeout %ul.\n", timeout));
306                         return NT_STATUS_IO_TIMEOUT;
307                 }
308         } else {
309                 if (tdb_chainlock(tdb, kbuf) == -1) {
310                         DEBUG(0,("message_send_pid_internal: failed to get "
311                                  "chainlock.\n"));
312                         return NT_STATUS_LOCK_NOT_GRANTED;
313                 }
314         }       
315
316         old_dbuf = tdb_fetch(tdb, kbuf);
317
318         if (!old_dbuf.dptr) {
319                 /* its a new record */
320
321                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
322                 tdb_chainunlock(tdb, kbuf);
323
324                 SAFE_FREE(dbuf.dptr);
325                 errno = 0;                    /* paranoia */
326                 return message_notify(pid);
327         }
328
329         /* Not a new record. Check for duplicates. */
330
331         for(ptr = old_dbuf.dptr; ptr < old_dbuf.dptr + old_dbuf.dsize; ) {
332                 /*
333                  * First check if the message header matches, then, if it's a non-zero
334                  * sized message, check if the data matches. If so it's a duplicate and
335                  * we can discard it. JRA.
336                  */
337
338                 if (!memcmp(ptr, &rec, sizeof(rec))) {
339                         if (!len || (len && !memcmp( ptr + sizeof(rec), buf, len))) {
340                                 tdb_chainunlock(tdb, kbuf);
341                                 DEBUG(10,("message_send_pid_internal: discarding "
342                                           "duplicate message.\n"));
343                                 SAFE_FREE(dbuf.dptr);
344                                 SAFE_FREE(old_dbuf.dptr);
345                                 return NT_STATUS_OK;
346                         }
347                 }
348                 memcpy(&prec, ptr, sizeof(prec));
349                 ptr += sizeof(rec) + prec.len;
350         }
351
352         /* we're adding to an existing entry */
353
354         tdb_append(tdb, kbuf, dbuf);
355         tdb_chainunlock(tdb, kbuf);
356
357         SAFE_FREE(old_dbuf.dptr);
358         SAFE_FREE(dbuf.dptr);
359
360         errno = 0;                    /* paranoia */
361         return message_notify(pid);
362 }
363
364 /****************************************************************************
365  Send a message to a particular pid - no timeout.
366 ****************************************************************************/
367
368 NTSTATUS message_send_pid(struct process_id pid, int msg_type, const void *buf,
369                           size_t len, BOOL duplicates_allowed)
370 {
371         return message_send_pid_internal(pid, msg_type, buf, len,
372                                          duplicates_allowed, 0);
373 }
374
375 /****************************************************************************
376  Send a message to a particular pid, with timeout in seconds.
377 ****************************************************************************/
378
379 NTSTATUS message_send_pid_with_timeout(struct process_id pid, int msg_type,
380                                        const void *buf, size_t len,
381                                        BOOL duplicates_allowed, unsigned int timeout)
382 {
383         return message_send_pid_internal(pid, msg_type, buf, len, duplicates_allowed,
384                                          timeout);
385 }
386
387 /****************************************************************************
388  Count the messages pending for a particular pid. Expensive....
389 ****************************************************************************/
390
391 unsigned int messages_pending_for_pid(struct process_id pid)
392 {
393         TDB_DATA kbuf;
394         TDB_DATA dbuf;
395         uint8 *buf;
396         unsigned int message_count = 0;
397
398         kbuf = message_key_pid(pid);
399
400         dbuf = tdb_fetch(tdb, kbuf);
401         if (dbuf.dptr == NULL || dbuf.dsize == 0) {
402                 SAFE_FREE(dbuf.dptr);
403                 return 0;
404         }
405
406         for (buf = dbuf.dptr; dbuf.dsize > sizeof(struct message_rec);) {
407                 struct message_rec rec;
408                 memcpy(&rec, buf, sizeof(rec));
409                 buf += (sizeof(rec) + rec.len);
410                 dbuf.dsize -= (sizeof(rec) + rec.len);
411                 message_count++;
412         }
413
414         SAFE_FREE(dbuf.dptr);
415         return message_count;
416 }
417
418 /****************************************************************************
419  Retrieve all messages for the current process.
420 ****************************************************************************/
421
422 static BOOL retrieve_all_messages(char **msgs_buf, size_t *total_len)
423 {
424         TDB_DATA kbuf;
425         TDB_DATA dbuf;
426         TDB_DATA null_dbuf;
427
428         ZERO_STRUCT(null_dbuf);
429
430         *msgs_buf = NULL;
431         *total_len = 0;
432
433         kbuf = message_key_pid(pid_to_procid(sys_getpid()));
434
435         if (tdb_chainlock(tdb, kbuf) == -1)
436                 return False;
437
438         dbuf = tdb_fetch(tdb, kbuf);
439         /*
440          * Replace with an empty record to keep the allocated
441          * space in the tdb.
442          */
443         tdb_store(tdb, kbuf, null_dbuf, TDB_REPLACE);
444         tdb_chainunlock(tdb, kbuf);
445
446         if (dbuf.dptr == NULL || dbuf.dsize == 0) {
447                 SAFE_FREE(dbuf.dptr);
448                 return False;
449         }
450
451         *msgs_buf = (char *)dbuf.dptr;
452         *total_len = dbuf.dsize;
453
454         return True;
455 }
456
457 /****************************************************************************
458  Parse out the next message for the current process.
459 ****************************************************************************/
460
461 static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type,
462                          struct process_id *src, char **buf, size_t *len)
463 {
464         struct message_rec rec;
465         char *ret_buf = *buf;
466
467         *buf = NULL;
468         *len = 0;
469
470         if (total_len - (ret_buf - msgs_buf) < sizeof(rec))
471                 return False;
472
473         memcpy(&rec, ret_buf, sizeof(rec));
474         ret_buf += sizeof(rec);
475
476         if (rec.msg_version != MESSAGE_VERSION) {
477                 DEBUG(0,("message version %d received (expected %d)\n", rec.msg_version, MESSAGE_VERSION));
478                 return False;
479         }
480
481         if (rec.len > 0) {
482                 if (total_len - (ret_buf - msgs_buf) < rec.len)
483                         return False;
484         }
485
486         *len = rec.len;
487         *msg_type = rec.msg_type;
488         *src = rec.src;
489         *buf = ret_buf;
490
491         return True;
492 }
493
494 /****************************************************************************
495  Receive and dispatch any messages pending for this process.
496  JRA changed Dec 13 2006. Only one message handler now permitted per type.
497  *NOTE*: Dispatch functions must be able to cope with incoming
498  messages on an *odd* byte boundary.
499 ****************************************************************************/
500
501 void message_dispatch(void)
502 {
503         int msg_type;
504         struct process_id src;
505         char *buf;
506         char *msgs_buf;
507         size_t len, total_len;
508         int n_handled;
509
510         if (!received_signal)
511                 return;
512
513         DEBUG(10,("message_dispatch: received_signal = %d\n", received_signal));
514
515         received_signal = 0;
516
517         if (!retrieve_all_messages(&msgs_buf, &total_len))
518                 return;
519
520         for (buf = msgs_buf; message_recv(msgs_buf, total_len, &msg_type, &src, &buf, &len); buf += len) {
521                 struct dispatch_fns *dfn;
522
523                 DEBUG(10,("message_dispatch: received msg_type=%d "
524                           "src_pid=%u\n", msg_type,
525                           (unsigned int) procid_to_pid(&src)));
526
527                 n_handled = 0;
528                 for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
529                         if (dfn->msg_type == msg_type) {
530                                 DEBUG(10,("message_dispatch: processing message of type %d.\n", msg_type));
531                                 dfn->fn(msg_type, src,
532                                         len ? (void *)buf : NULL, len,
533                                         dfn->private_data);
534                                 n_handled++;
535                                 break;
536                         }
537                 }
538                 if (!n_handled) {
539                         DEBUG(5,("message_dispatch: warning: no handler registed for "
540                                  "msg_type %d in pid %u\n",
541                                  msg_type, (unsigned int)sys_getpid()));
542                 }
543         }
544         SAFE_FREE(msgs_buf);
545 }
546
547 /****************************************************************************
548  Register/replace a dispatch function for a particular message type.
549  JRA changed Dec 13 2006. Only one message handler now permitted per type.
550  *NOTE*: Dispatch functions must be able to cope with incoming
551  messages on an *odd* byte boundary.
552 ****************************************************************************/
553
554 void message_register(int msg_type, 
555                       void (*fn)(int msg_type, struct process_id pid,
556                                  void *buf, size_t len,
557                                  void *private_data),
558                       void *private_data)
559 {
560         struct dispatch_fns *dfn;
561
562         for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
563                 if (dfn->msg_type == msg_type) {
564                         dfn->fn = fn;
565                         return;
566                 }
567         }
568
569         dfn = SMB_MALLOC_P(struct dispatch_fns);
570
571         if (dfn != NULL) {
572
573                 ZERO_STRUCTPN(dfn);
574
575                 dfn->msg_type = msg_type;
576                 dfn->fn = fn;
577                 dfn->private_data = private_data;
578
579                 DLIST_ADD(dispatch_fns, dfn);
580         }
581         else {
582         
583                 DEBUG(0,("message_register: Not enough memory. malloc failed!\n"));
584         }
585 }
586
587 /****************************************************************************
588  De-register the function for a particular message type.
589 ****************************************************************************/
590
591 void message_deregister(int msg_type)
592 {
593         struct dispatch_fns *dfn, *next;
594
595         for (dfn = dispatch_fns; dfn; dfn = next) {
596                 next = dfn->next;
597                 if (dfn->msg_type == msg_type) {
598                         DLIST_REMOVE(dispatch_fns, dfn);
599                         SAFE_FREE(dfn);
600                         return;
601                 }
602         }       
603 }
604
605 struct msg_all {
606         int msg_type;
607         uint32 msg_flag;
608         const void *buf;
609         size_t len;
610         BOOL duplicates;
611         int n_sent;
612 };
613
614 /****************************************************************************
615  Send one of the messages for the broadcast.
616 ****************************************************************************/
617
618 static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void *state)
619 {
620         struct connections_data crec;
621         struct msg_all *msg_all = (struct msg_all *)state;
622         NTSTATUS status;
623
624         if (dbuf.dsize != sizeof(crec))
625                 return 0;
626
627         memcpy(&crec, dbuf.dptr, sizeof(crec));
628
629         if (crec.cnum != -1)
630                 return 0;
631
632         /* Don't send if the receiver hasn't registered an interest. */
633
634         if(!(crec.bcast_msg_flags & msg_all->msg_flag))
635                 return 0;
636
637         /* If the msg send fails because the pid was not found (i.e. smbd died), 
638          * the msg has already been deleted from the messages.tdb.*/
639
640         status = message_send_pid(crec.pid, msg_all->msg_type,
641                                   msg_all->buf, msg_all->len,
642                                   msg_all->duplicates);
643
644         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
645                 
646                 /* If the pid was not found delete the entry from connections.tdb */
647
648                 DEBUG(2,("pid %s doesn't exist - deleting connections %d [%s]\n",
649                          procid_str_static(&crec.pid), crec.cnum, crec.servicename));
650                 tdb_delete(the_tdb, kbuf);
651         }
652         msg_all->n_sent++;
653         return 0;
654 }
655
656 /**
657  * Send a message to all smbd processes.
658  *
659  * It isn't very efficient, but should be OK for the sorts of
660  * applications that use it. When we need efficient broadcast we can add
661  * it.
662  *
663  * @param n_sent Set to the number of messages sent.  This should be
664  * equal to the number of processes, but be careful for races.
665  *
666  * @retval True for success.
667  **/
668 BOOL message_send_all(TDB_CONTEXT *conn_tdb, int msg_type,
669                       const void *buf, size_t len,
670                       BOOL duplicates_allowed,
671                       int *n_sent)
672 {
673         struct msg_all msg_all;
674
675         msg_all.msg_type = msg_type;
676         if (msg_type < 1000)
677                 msg_all.msg_flag = FLAG_MSG_GENERAL;
678         else if (msg_type > 1000 && msg_type < 2000)
679                 msg_all.msg_flag = FLAG_MSG_NMBD;
680         else if (msg_type > 2000 && msg_type < 2100)
681                 msg_all.msg_flag = FLAG_MSG_PRINT_NOTIFY;
682         else if (msg_type > 2100 && msg_type < 3000)
683                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
684         else if (msg_type > 3000 && msg_type < 4000)
685                 msg_all.msg_flag = FLAG_MSG_SMBD;
686         else
687                 return False;
688
689         msg_all.buf = buf;
690         msg_all.len = len;
691         msg_all.duplicates = duplicates_allowed;
692         msg_all.n_sent = 0;
693
694         tdb_traverse(conn_tdb, traverse_fn, &msg_all);
695         if (n_sent)
696                 *n_sent = msg_all.n_sent;
697         return True;
698 }
699
700 /*
701  * Block and unblock receiving of messages. Allows removal of race conditions
702  * when doing a fork and changing message disposition.
703  */
704
705 void message_block(void)
706 {
707         BlockSignals(True, SIGUSR1);
708 }
709
710 void message_unblock(void)
711 {
712         BlockSignals(False, SIGUSR1);
713 }
714
715 /*
716  * Samba4 API wrapper around the Samba3 implementation. Yes, I know, we could
717  * import the whole Samba4 thing, but I want notify.c from Samba4 in first.
718  */
719
720 struct messaging_callback {
721         struct messaging_callback *prev, *next;
722         uint32 msg_type;
723         void (*fn)(struct messaging_context *msg, void *private_data, 
724                    uint32_t msg_type, 
725                    struct server_id server_id, DATA_BLOB *data);
726         void *private_data;
727 };
728
729 struct messaging_context {
730         struct server_id id;
731         struct messaging_callback *callbacks;
732 };
733
734 static int messaging_context_destructor(struct messaging_context *ctx)
735 {
736         struct messaging_callback *cb;
737
738         for (cb = ctx->callbacks; cb; cb = cb->next) {
739                 /*
740                  * We unconditionally remove all instances of our callback
741                  * from the tdb basis.
742                  */
743                 message_deregister(cb->msg_type);
744         }
745         return 0;
746 }
747
748 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
749                                          struct server_id server_id, 
750                                          struct event_context *ev)
751 {
752         struct messaging_context *ctx;
753
754         if (!(ctx = TALLOC_ZERO_P(mem_ctx, struct messaging_context))) {
755                 return NULL;
756         }
757
758         ctx->id = server_id;
759         talloc_set_destructor(ctx, messaging_context_destructor);
760         return ctx;
761 }
762
763 static void messaging_callback(int msg_type, struct process_id pid,
764                                void *buf, size_t len, void *private_data)
765 {
766         struct messaging_context *ctx = talloc_get_type_abort(
767                 private_data, struct messaging_context);
768         struct messaging_callback *cb, *next;
769
770         for (cb = ctx->callbacks; cb; cb = next) {
771                 /*
772                  * Allow a callback to remove itself
773                  */
774                 next = cb->next;
775
776                 if (msg_type == cb->msg_type) {
777                         DATA_BLOB blob;
778                         struct server_id id;
779
780                         blob.data = (uint8 *)buf;
781                         blob.length = len;
782                         id.id = pid;
783
784                         cb->fn(ctx, cb->private_data, msg_type, id, &blob);
785                 }
786         }
787 }
788
789 /*
790  * Register a dispatch function for a particular message type. Allow multiple
791  * registrants
792 */
793 NTSTATUS messaging_register(struct messaging_context *ctx, void *private_data,
794                             uint32_t msg_type,
795                             void (*fn)(struct messaging_context *msg,
796                                        void *private_data, 
797                                        uint32_t msg_type, 
798                                        struct server_id server_id,
799                                        DATA_BLOB *data))
800 {
801         struct messaging_callback *cb;
802
803         if (!(cb = talloc(ctx, struct messaging_callback))) {
804                 return NT_STATUS_NO_MEMORY;
805         }
806
807         cb->msg_type = msg_type;
808         cb->fn = fn;
809         cb->private_data = private_data;
810
811         DLIST_ADD(ctx->callbacks, cb);
812         message_register(msg_type, messaging_callback, ctx);
813         return NT_STATUS_OK;
814 }
815
816 /*
817   De-register the function for a particular message type.
818 */
819 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
820                           void *private_data)
821 {
822         struct messaging_callback *cb, *next;
823
824         for (cb = ctx->callbacks; cb; cb = next) {
825                 next = cb->next;
826                 if ((cb->msg_type == msg_type)
827                     && (cb->private_data == private_data)) {
828                         DLIST_REMOVE(ctx->callbacks, cb);
829                         TALLOC_FREE(cb);
830                 }
831         }
832 }
833
834 /*
835   Send a message to a particular server
836 */
837 NTSTATUS messaging_send(struct messaging_context *msg,
838                         struct server_id server, 
839                         uint32_t msg_type, DATA_BLOB *data)
840 {
841         return message_send_pid_internal(server.id, msg_type, data->data,
842                                          data->length, True, 0);
843 }
844
845 /** @} **/