r21076: Two pieces of infrastructure from Samba4: An API-compatible messaging wrapper
[sfrench/samba-autobuild/.git] / source / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 2 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27   
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49
50 /* the locking database handle */
51 static TDB_CONTEXT *tdb;
52 static int received_signal;
53
54 /* change the message version with any incompatible changes in the protocol */
55 #define MESSAGE_VERSION 1
56
57 struct message_rec {
58         int msg_version;
59         int msg_type;
60         struct process_id dest;
61         struct process_id src;
62         size_t len;
63 };
64
65 /* we have a linked list of dispatch handlers */
66 static struct dispatch_fns {
67         struct dispatch_fns *next, *prev;
68         int msg_type;
69         void (*fn)(int msg_type, struct process_id pid, void *buf, size_t len,
70                    void *private_data);
71         void *private_data;
72 } *dispatch_fns;
73
74 /****************************************************************************
75  Free global objects.
76 ****************************************************************************/
77
78 void gfree_messages(void)
79 {
80         struct dispatch_fns *dfn, *next;
81
82         /* delete the dispatch_fns list */
83         dfn = dispatch_fns;
84         while( dfn ) {
85                 next = dfn->next;
86                 DLIST_REMOVE(dispatch_fns, dfn);
87                 SAFE_FREE(dfn);
88                 dfn = next;
89         }
90 }
91
92 /****************************************************************************
93  Notifications come in as signals.
94 ****************************************************************************/
95
96 static void sig_usr1(void)
97 {
98         received_signal = 1;
99         sys_select_signal(SIGUSR1);
100 }
101
102 /****************************************************************************
103  A useful function for testing the message system.
104 ****************************************************************************/
105
106 static void ping_message(int msg_type, struct process_id src,
107                          void *buf, size_t len, void *private_data)
108 {
109         const char *msg = buf ? (const char *)buf : "none";
110
111         DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
112                  procid_str_static(&src), msg));
113         message_send_pid(src, MSG_PONG, buf, len, True);
114 }
115
116 /****************************************************************************
117  Initialise the messaging functions. 
118 ****************************************************************************/
119
120 BOOL message_init(void)
121 {
122         sec_init();
123
124         if (tdb)
125                 return True;
126
127         tdb = tdb_open_log(lock_path("messages.tdb"), 
128                        0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, 
129                        O_RDWR|O_CREAT,0600);
130
131         if (!tdb) {
132                 DEBUG(0,("ERROR: Failed to initialise messages database\n"));
133                 return False;
134         }
135
136         CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
137
138         message_register(MSG_PING, ping_message, NULL);
139
140         /* Register some debugging related messages */
141
142         register_msg_pool_usage();
143         register_dmalloc_msgs();
144
145         return True;
146 }
147
148 /*******************************************************************
149  Form a static tdb key from a pid.
150 ******************************************************************/
151
152 static TDB_DATA message_key_pid(struct process_id pid)
153 {
154         static char key[20];
155         TDB_DATA kbuf;
156
157         slprintf(key, sizeof(key)-1, "PID/%s", procid_str_static(&pid));
158         
159         kbuf.dptr = (char *)key;
160         kbuf.dsize = strlen(key)+1;
161         return kbuf;
162 }
163
164 /****************************************************************************
165  Notify a process that it has a message. If the process doesn't exist 
166  then delete its record in the database.
167 ****************************************************************************/
168
169 static BOOL message_notify(struct process_id procid)
170 {
171         pid_t pid = procid.pid;
172         int ret;
173         uid_t euid = geteuid();
174
175         /*
176          * Doing kill with a non-positive pid causes messages to be
177          * sent to places we don't want.
178          */
179
180         SMB_ASSERT(pid > 0);
181
182         if (euid != 0) {
183                 become_root_uid_only();
184         }
185
186         ret = kill(pid, SIGUSR1);
187
188         if (euid != 0) {
189                 unbecome_root_uid_only();
190         }
191
192         if (ret == -1) {
193                 if (errno == ESRCH) {
194                         DEBUG(2,("pid %d doesn't exist - deleting messages record\n", (int)pid));
195                         tdb_delete(tdb, message_key_pid(procid));
196                 } else {
197                         DEBUG(2,("message to process %d failed - %s\n", (int)pid, strerror(errno)));
198                 }
199                 return False;
200         }
201
202         return True;
203 }
204
205 /****************************************************************************
206  Send a message to a particular pid.
207 ****************************************************************************/
208
209 static BOOL message_send_pid_internal(struct process_id pid, int msg_type,
210                                       const void *buf, size_t len,
211                                       BOOL duplicates_allowed,
212                                       unsigned int timeout)
213 {
214         TDB_DATA kbuf;
215         TDB_DATA dbuf;
216         TDB_DATA old_dbuf;
217         struct message_rec rec;
218         char *ptr;
219         struct message_rec prec;
220
221         /* NULL pointer means implicit length zero. */
222         if (!buf) {
223                 SMB_ASSERT(len == 0);
224         }
225
226         /*
227          * Doing kill with a non-positive pid causes messages to be
228          * sent to places we don't want.
229          */
230
231         SMB_ASSERT(procid_to_pid(&pid) > 0);
232
233         rec.msg_version = MESSAGE_VERSION;
234         rec.msg_type = msg_type;
235         rec.dest = pid;
236         rec.src = procid_self();
237         rec.len = buf ? len : 0;
238
239         kbuf = message_key_pid(pid);
240
241         dbuf.dptr = (char *)SMB_MALLOC(len + sizeof(rec));
242         if (!dbuf.dptr)
243                 return False;
244
245         memcpy(dbuf.dptr, &rec, sizeof(rec));
246         if (len > 0 && buf)
247                 memcpy((void *)((char*)dbuf.dptr+sizeof(rec)), buf, len);
248
249         dbuf.dsize = len + sizeof(rec);
250
251         if (duplicates_allowed) {
252
253                 /* If duplicates are allowed we can just append the message and return. */
254
255                 /* lock the record for the destination */
256                 if (timeout) {
257                         if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) {
258                                 DEBUG(0,("message_send_pid_internal: failed to get chainlock with timeout %ul.\n", timeout));
259                                 return False;
260                         }
261                 } else {
262                         if (tdb_chainlock(tdb, kbuf) == -1) {
263                                 DEBUG(0,("message_send_pid_internal: failed to get chainlock.\n"));
264                                 return False;
265                         }
266                 }       
267                 tdb_append(tdb, kbuf, dbuf);
268                 tdb_chainunlock(tdb, kbuf);
269
270                 SAFE_FREE(dbuf.dptr);
271                 errno = 0;                    /* paranoia */
272                 return message_notify(pid);
273         }
274
275         /* lock the record for the destination */
276         if (timeout) {
277                 if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) {
278                         DEBUG(0,("message_send_pid_internal: failed to get chainlock with timeout %ul.\n", timeout));
279                         return False;
280                 }
281         } else {
282                 if (tdb_chainlock(tdb, kbuf) == -1) {
283                         DEBUG(0,("message_send_pid_internal: failed to get chainlock.\n"));
284                         return False;
285                 }
286         }       
287
288         old_dbuf = tdb_fetch(tdb, kbuf);
289
290         if (!old_dbuf.dptr) {
291                 /* its a new record */
292
293                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
294                 tdb_chainunlock(tdb, kbuf);
295
296                 SAFE_FREE(dbuf.dptr);
297                 errno = 0;                    /* paranoia */
298                 return message_notify(pid);
299         }
300
301         /* Not a new record. Check for duplicates. */
302
303         for(ptr = (char *)old_dbuf.dptr; ptr < old_dbuf.dptr + old_dbuf.dsize; ) {
304                 /*
305                  * First check if the message header matches, then, if it's a non-zero
306                  * sized message, check if the data matches. If so it's a duplicate and
307                  * we can discard it. JRA.
308                  */
309
310                 if (!memcmp(ptr, &rec, sizeof(rec))) {
311                         if (!len || (len && !memcmp( ptr + sizeof(rec), buf, len))) {
312                                 tdb_chainunlock(tdb, kbuf);
313                                 DEBUG(10,("message_send_pid_internal: discarding duplicate message.\n"));
314                                 SAFE_FREE(dbuf.dptr);
315                                 SAFE_FREE(old_dbuf.dptr);
316                                 return True;
317                         }
318                 }
319                 memcpy(&prec, ptr, sizeof(prec));
320                 ptr += sizeof(rec) + prec.len;
321         }
322
323         /* we're adding to an existing entry */
324
325         tdb_append(tdb, kbuf, dbuf);
326         tdb_chainunlock(tdb, kbuf);
327
328         SAFE_FREE(old_dbuf.dptr);
329         SAFE_FREE(dbuf.dptr);
330
331         errno = 0;                    /* paranoia */
332         return message_notify(pid);
333 }
334
335 /****************************************************************************
336  Send a message to a particular pid - no timeout.
337 ****************************************************************************/
338
339 BOOL message_send_pid(struct process_id pid, int msg_type, const void *buf, size_t len, BOOL duplicates_allowed)
340 {
341         return message_send_pid_internal(pid, msg_type, buf, len, duplicates_allowed, 0);
342 }
343
344 /****************************************************************************
345  Send a message to a particular pid, with timeout in seconds.
346 ****************************************************************************/
347
348 BOOL message_send_pid_with_timeout(struct process_id pid, int msg_type, const void *buf, size_t len,
349                 BOOL duplicates_allowed, unsigned int timeout)
350 {
351         return message_send_pid_internal(pid, msg_type, buf, len, duplicates_allowed, timeout);
352 }
353
354 /****************************************************************************
355  Count the messages pending for a particular pid. Expensive....
356 ****************************************************************************/
357
358 unsigned int messages_pending_for_pid(struct process_id pid)
359 {
360         TDB_DATA kbuf;
361         TDB_DATA dbuf;
362         char *buf;
363         unsigned int message_count = 0;
364
365         kbuf = message_key_pid(pid);
366
367         dbuf = tdb_fetch(tdb, kbuf);
368         if (dbuf.dptr == NULL || dbuf.dsize == 0) {
369                 SAFE_FREE(dbuf.dptr);
370                 return 0;
371         }
372
373         for (buf = dbuf.dptr; dbuf.dsize > sizeof(struct message_rec);) {
374                 struct message_rec rec;
375                 memcpy(&rec, buf, sizeof(rec));
376                 buf += (sizeof(rec) + rec.len);
377                 dbuf.dsize -= (sizeof(rec) + rec.len);
378                 message_count++;
379         }
380
381         SAFE_FREE(dbuf.dptr);
382         return message_count;
383 }
384
385 /****************************************************************************
386  Retrieve all messages for the current process.
387 ****************************************************************************/
388
389 static BOOL retrieve_all_messages(char **msgs_buf, size_t *total_len)
390 {
391         TDB_DATA kbuf;
392         TDB_DATA dbuf;
393         TDB_DATA null_dbuf;
394
395         ZERO_STRUCT(null_dbuf);
396
397         *msgs_buf = NULL;
398         *total_len = 0;
399
400         kbuf = message_key_pid(pid_to_procid(sys_getpid()));
401
402         if (tdb_chainlock(tdb, kbuf) == -1)
403                 return False;
404
405         dbuf = tdb_fetch(tdb, kbuf);
406         /*
407          * Replace with an empty record to keep the allocated
408          * space in the tdb.
409          */
410         tdb_store(tdb, kbuf, null_dbuf, TDB_REPLACE);
411         tdb_chainunlock(tdb, kbuf);
412
413         if (dbuf.dptr == NULL || dbuf.dsize == 0) {
414                 SAFE_FREE(dbuf.dptr);
415                 return False;
416         }
417
418         *msgs_buf = dbuf.dptr;
419         *total_len = dbuf.dsize;
420
421         return True;
422 }
423
424 /****************************************************************************
425  Parse out the next message for the current process.
426 ****************************************************************************/
427
428 static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type,
429                          struct process_id *src, char **buf, size_t *len)
430 {
431         struct message_rec rec;
432         char *ret_buf = *buf;
433
434         *buf = NULL;
435         *len = 0;
436
437         if (total_len - (ret_buf - msgs_buf) < sizeof(rec))
438                 return False;
439
440         memcpy(&rec, ret_buf, sizeof(rec));
441         ret_buf += sizeof(rec);
442
443         if (rec.msg_version != MESSAGE_VERSION) {
444                 DEBUG(0,("message version %d received (expected %d)\n", rec.msg_version, MESSAGE_VERSION));
445                 return False;
446         }
447
448         if (rec.len > 0) {
449                 if (total_len - (ret_buf - msgs_buf) < rec.len)
450                         return False;
451         }
452
453         *len = rec.len;
454         *msg_type = rec.msg_type;
455         *src = rec.src;
456         *buf = ret_buf;
457
458         return True;
459 }
460
461 /****************************************************************************
462  Receive and dispatch any messages pending for this process.
463  JRA changed Dec 13 2006. Only one message handler now permitted per type.
464  *NOTE*: Dispatch functions must be able to cope with incoming
465  messages on an *odd* byte boundary.
466 ****************************************************************************/
467
468 void message_dispatch(void)
469 {
470         int msg_type;
471         struct process_id src;
472         char *buf;
473         char *msgs_buf;
474         size_t len, total_len;
475         int n_handled;
476
477         if (!received_signal)
478                 return;
479
480         DEBUG(10,("message_dispatch: received_signal = %d\n", received_signal));
481
482         received_signal = 0;
483
484         if (!retrieve_all_messages(&msgs_buf, &total_len))
485                 return;
486
487         for (buf = msgs_buf; message_recv(msgs_buf, total_len, &msg_type, &src, &buf, &len); buf += len) {
488                 struct dispatch_fns *dfn;
489
490                 DEBUG(10,("message_dispatch: received msg_type=%d "
491                           "src_pid=%u\n", msg_type,
492                           (unsigned int) procid_to_pid(&src)));
493
494                 n_handled = 0;
495                 for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
496                         if (dfn->msg_type == msg_type) {
497                                 DEBUG(10,("message_dispatch: processing message of type %d.\n", msg_type));
498                                 dfn->fn(msg_type, src,
499                                         len ? (void *)buf : NULL, len,
500                                         dfn->private_data);
501                                 n_handled++;
502                                 break;
503                         }
504                 }
505                 if (!n_handled) {
506                         DEBUG(5,("message_dispatch: warning: no handler registed for "
507                                  "msg_type %d in pid %u\n",
508                                  msg_type, (unsigned int)sys_getpid()));
509                 }
510         }
511         SAFE_FREE(msgs_buf);
512 }
513
514 /****************************************************************************
515  Register/replace a dispatch function for a particular message type.
516  JRA changed Dec 13 2006. Only one message handler now permitted per type.
517  *NOTE*: Dispatch functions must be able to cope with incoming
518  messages on an *odd* byte boundary.
519 ****************************************************************************/
520
521 void message_register(int msg_type, 
522                       void (*fn)(int msg_type, struct process_id pid,
523                                  void *buf, size_t len,
524                                  void *private_data),
525                       void *private_data)
526 {
527         struct dispatch_fns *dfn;
528
529         for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
530                 if (dfn->msg_type == msg_type) {
531                         dfn->fn = fn;
532                         return;
533                 }
534         }
535
536         dfn = SMB_MALLOC_P(struct dispatch_fns);
537
538         if (dfn != NULL) {
539
540                 ZERO_STRUCTPN(dfn);
541
542                 dfn->msg_type = msg_type;
543                 dfn->fn = fn;
544                 dfn->private_data = private_data;
545
546                 DLIST_ADD(dispatch_fns, dfn);
547         }
548         else {
549         
550                 DEBUG(0,("message_register: Not enough memory. malloc failed!\n"));
551         }
552 }
553
554 /****************************************************************************
555  De-register the function for a particular message type.
556 ****************************************************************************/
557
558 void message_deregister(int msg_type)
559 {
560         struct dispatch_fns *dfn, *next;
561
562         for (dfn = dispatch_fns; dfn; dfn = next) {
563                 next = dfn->next;
564                 if (dfn->msg_type == msg_type) {
565                         DLIST_REMOVE(dispatch_fns, dfn);
566                         SAFE_FREE(dfn);
567                         return;
568                 }
569         }       
570 }
571
572 struct msg_all {
573         int msg_type;
574         uint32 msg_flag;
575         const void *buf;
576         size_t len;
577         BOOL duplicates;
578         int n_sent;
579 };
580
581 /****************************************************************************
582  Send one of the messages for the broadcast.
583 ****************************************************************************/
584
585 static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void *state)
586 {
587         struct connections_data crec;
588         struct msg_all *msg_all = (struct msg_all *)state;
589
590         if (dbuf.dsize != sizeof(crec))
591                 return 0;
592
593         memcpy(&crec, dbuf.dptr, sizeof(crec));
594
595         if (crec.cnum != -1)
596                 return 0;
597
598         /* Don't send if the receiver hasn't registered an interest. */
599
600         if(!(crec.bcast_msg_flags & msg_all->msg_flag))
601                 return 0;
602
603         /* If the msg send fails because the pid was not found (i.e. smbd died), 
604          * the msg has already been deleted from the messages.tdb.*/
605
606         if (!message_send_pid(crec.pid, msg_all->msg_type,
607                               msg_all->buf, msg_all->len,
608                               msg_all->duplicates)) {
609                 
610                 /* If the pid was not found delete the entry from connections.tdb */
611
612                 if (errno == ESRCH) {
613                         DEBUG(2,("pid %s doesn't exist - deleting connections %d [%s]\n",
614                                  procid_str_static(&crec.pid),
615                                  crec.cnum, crec.name));
616                         tdb_delete(the_tdb, kbuf);
617                 }
618         }
619         msg_all->n_sent++;
620         return 0;
621 }
622
623 /**
624  * Send a message to all smbd processes.
625  *
626  * It isn't very efficient, but should be OK for the sorts of
627  * applications that use it. When we need efficient broadcast we can add
628  * it.
629  *
630  * @param n_sent Set to the number of messages sent.  This should be
631  * equal to the number of processes, but be careful for races.
632  *
633  * @retval True for success.
634  **/
635 BOOL message_send_all(TDB_CONTEXT *conn_tdb, int msg_type,
636                       const void *buf, size_t len,
637                       BOOL duplicates_allowed,
638                       int *n_sent)
639 {
640         struct msg_all msg_all;
641
642         msg_all.msg_type = msg_type;
643         if (msg_type < 1000)
644                 msg_all.msg_flag = FLAG_MSG_GENERAL;
645         else if (msg_type > 1000 && msg_type < 2000)
646                 msg_all.msg_flag = FLAG_MSG_NMBD;
647         else if (msg_type > 2000 && msg_type < 2100)
648                 msg_all.msg_flag = FLAG_MSG_PRINT_NOTIFY;
649         else if (msg_type > 2100 && msg_type < 3000)
650                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
651         else if (msg_type > 3000 && msg_type < 4000)
652                 msg_all.msg_flag = FLAG_MSG_SMBD;
653         else
654                 return False;
655
656         msg_all.buf = buf;
657         msg_all.len = len;
658         msg_all.duplicates = duplicates_allowed;
659         msg_all.n_sent = 0;
660
661         tdb_traverse(conn_tdb, traverse_fn, &msg_all);
662         if (n_sent)
663                 *n_sent = msg_all.n_sent;
664         return True;
665 }
666
667 /*
668  * Block and unblock receiving of messages. Allows removal of race conditions
669  * when doing a fork and changing message disposition.
670  */
671
672 void message_block(void)
673 {
674         BlockSignals(True, SIGUSR1);
675 }
676
677 void message_unblock(void)
678 {
679         BlockSignals(False, SIGUSR1);
680 }
681
682 /*
683  * Samba4 API wrapper around the Samba3 implementation. Yes, I know, we could
684  * import the whole Samba4 thing, but I want notify.c from Samba4 in first.
685  */
686
687 struct messaging_callback {
688         struct messaging_callback *prev, *next;
689         uint32 msg_type;
690         void (*fn)(struct messaging_context *msg, void *private_data, 
691                    uint32_t msg_type, 
692                    struct server_id server_id, DATA_BLOB *data);
693         void *private_data;
694 };
695
696 struct messaging_context {
697         struct server_id id;
698         struct messaging_callback *callbacks;
699 };
700
701 static int messaging_context_destructor(struct messaging_context *ctx)
702 {
703         struct messaging_callback *cb;
704
705         for (cb = ctx->callbacks; cb; cb = cb->next) {
706                 /*
707                  * We unconditionally remove all instances of our callback
708                  * from the tdb basis.
709                  */
710                 message_deregister(cb->msg_type);
711         }
712         return 0;
713 }
714
715 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
716                                          struct server_id server_id, 
717                                          struct event_context *ev)
718 {
719         struct messaging_context *ctx;
720
721         if (!(ctx = TALLOC_ZERO_P(mem_ctx, struct messaging_context))) {
722                 return NULL;
723         }
724
725         ctx->id = server_id;
726         talloc_set_destructor(ctx, messaging_context_destructor);
727         return ctx;
728 }
729
730 static void messaging_callback(int msg_type, struct process_id pid,
731                                void *buf, size_t len, void *private_data)
732 {
733         struct messaging_context *ctx = talloc_get_type_abort(
734                 private_data, struct messaging_context);
735         struct messaging_callback *cb, *next;
736
737         for (cb = ctx->callbacks; cb; cb = next) {
738                 /*
739                  * Allow a callback to remove itself
740                  */
741                 next = cb->next;
742
743                 if (msg_type == cb->msg_type) {
744                         DATA_BLOB blob;
745                         struct server_id id;
746
747                         blob.data = (uint8 *)buf;
748                         blob.length = len;
749                         id.id = pid;
750
751                         cb->fn(ctx, cb->private_data, msg_type, id, &blob);
752                 }
753         }
754 }
755
756 /*
757  * Register a dispatch function for a particular message type. Allow multiple
758  * registrants
759 */
760 NTSTATUS messaging_register(struct messaging_context *ctx, void *private_data,
761                             uint32_t msg_type,
762                             void (*fn)(struct messaging_context *msg,
763                                        void *private_data, 
764                                        uint32_t msg_type, 
765                                        struct server_id server_id,
766                                        DATA_BLOB *data))
767 {
768         struct messaging_callback *cb;
769
770         if (!(cb = talloc(ctx, struct messaging_callback))) {
771                 return NT_STATUS_NO_MEMORY;
772         }
773
774         cb->msg_type = msg_type;
775         cb->fn = fn;
776         cb->private_data = private_data;
777
778         DLIST_ADD(ctx->callbacks, cb);
779         message_register(msg_type, messaging_callback, ctx);
780         return NT_STATUS_OK;
781 }
782
783 /*
784   De-register the function for a particular message type.
785 */
786 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
787                           void *private_data)
788 {
789         struct messaging_callback *cb, *next;
790
791         for (cb = ctx->callbacks; cb; cb = next) {
792                 next = cb->next;
793                 if ((cb->msg_type == msg_type)
794                     && (cb->private_data == private_data)) {
795                         DLIST_REMOVE(ctx->callbacks, cb);
796                         TALLOC_FREE(cb);
797                 }
798         }
799 }
800
801
802 /** @} **/