r23015: Make message_(de)register static to messages.c
[bbaumbach/samba-autobuild/.git] / source / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 2 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27   
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49
50 /* the locking database handle */
51 static TDB_CONTEXT *tdb;
52 static int received_signal;
53
54 /* change the message version with any incompatible changes in the protocol */
55 #define MESSAGE_VERSION 1
56
57 struct message_rec {
58         int msg_version;
59         int msg_type;
60         struct server_id dest;
61         struct server_id src;
62         size_t len;
63 };
64
65 /* we have a linked list of dispatch handlers */
66 static struct dispatch_fns {
67         struct dispatch_fns *next, *prev;
68         int msg_type;
69         void (*fn)(int msg_type, struct server_id pid, void *buf, size_t len,
70                    void *private_data);
71         void *private_data;
72 } *dispatch_fns;
73
74 static void message_register(int msg_type, 
75                              void (*fn)(int msg_type, struct server_id pid,
76                                         void *buf, size_t len,
77                                         void *private_data),
78                              void *private_data);
79
80 /****************************************************************************
81  Free global objects.
82 ****************************************************************************/
83
84 void gfree_messages(void)
85 {
86         struct dispatch_fns *dfn, *next;
87
88         /* delete the dispatch_fns list */
89         dfn = dispatch_fns;
90         while( dfn ) {
91                 next = dfn->next;
92                 DLIST_REMOVE(dispatch_fns, dfn);
93                 SAFE_FREE(dfn);
94                 dfn = next;
95         }
96 }
97
98 /****************************************************************************
99  Notifications come in as signals.
100 ****************************************************************************/
101
102 static void sig_usr1(void)
103 {
104         received_signal = 1;
105         sys_select_signal(SIGUSR1);
106 }
107
108 static NTSTATUS message_send_pid(struct server_id pid, int msg_type,
109                                  const void *buf, size_t len,
110                                  BOOL duplicates_allowed);
111
112 /****************************************************************************
113  A useful function for testing the message system.
114 ****************************************************************************/
115
116 static void ping_message(int msg_type, struct server_id src,
117                          void *buf, size_t len, void *private_data)
118 {
119         const char *msg = buf ? (const char *)buf : "none";
120
121         DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
122                  procid_str_static(&src), msg));
123         message_send_pid(src, MSG_PONG, buf, len, True);
124 }
125
126 /****************************************************************************
127  Initialise the messaging functions. 
128 ****************************************************************************/
129
130 static BOOL message_init(struct messaging_context *msg_ctx)
131 {
132         sec_init();
133
134         if (tdb)
135                 return True;
136
137         tdb = tdb_open_log(lock_path("messages.tdb"), 
138                        0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, 
139                        O_RDWR|O_CREAT,0600);
140
141         if (!tdb) {
142                 DEBUG(0,("ERROR: Failed to initialise messages database\n"));
143                 return False;
144         }
145
146         /* Activate the per-hashchain freelist */
147         tdb_set_max_dead(tdb, 5);
148
149         CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
150
151         message_register(MSG_PING, ping_message, NULL);
152
153         /* Register some debugging related messages */
154
155         register_msg_pool_usage(msg_ctx);
156         register_dmalloc_msgs(msg_ctx);
157         debug_register_msgs(msg_ctx);
158
159         return True;
160 }
161
162 /*******************************************************************
163  Form a static tdb key from a pid.
164 ******************************************************************/
165
166 static TDB_DATA message_key_pid(struct server_id pid)
167 {
168         static char key[20];
169         TDB_DATA kbuf;
170
171         slprintf(key, sizeof(key)-1, "PID/%s", procid_str_static(&pid));
172         
173         kbuf.dptr = (uint8 *)key;
174         kbuf.dsize = strlen(key)+1;
175         return kbuf;
176 }
177
178 /****************************************************************************
179  Notify a process that it has a message. If the process doesn't exist 
180  then delete its record in the database.
181 ****************************************************************************/
182
183 static NTSTATUS message_notify(struct server_id procid)
184 {
185         pid_t pid = procid.pid;
186         int ret;
187         uid_t euid = geteuid();
188
189         /*
190          * Doing kill with a non-positive pid causes messages to be
191          * sent to places we don't want.
192          */
193
194         SMB_ASSERT(pid > 0);
195
196         if (euid != 0) {
197                 /* If we're not root become so to send the message. */
198                 save_re_uid();
199                 set_effective_uid(0);
200         }
201
202         ret = kill(pid, SIGUSR1);
203
204         if (euid != 0) {
205                 /* Go back to who we were. */
206                 int saved_errno = errno;
207                 restore_re_uid_fromroot();
208                 errno = saved_errno;
209         }
210
211         if (ret == -1) {
212                 if (errno == ESRCH) {
213                         DEBUG(2,("pid %d doesn't exist - deleting messages record\n",
214                                  (int)pid));
215                         tdb_delete(tdb, message_key_pid(procid));
216
217                         /*
218                          * INVALID_HANDLE is the closest I can think of -- vl
219                          */
220                         return NT_STATUS_INVALID_HANDLE;
221                 }
222
223                 DEBUG(2,("message to process %d failed - %s\n", (int)pid,
224                          strerror(errno)));
225
226                 /*
227                  * No call to map_nt_error_from_unix -- don't want to link in
228                  * errormap.o into lots of utils.
229                  */
230
231                 if (errno == EINVAL) return NT_STATUS_INVALID_PARAMETER;
232                 if (errno == EPERM)  return NT_STATUS_ACCESS_DENIED;
233                 return NT_STATUS_UNSUCCESSFUL;
234         }
235
236         return NT_STATUS_OK;
237 }
238
239 /****************************************************************************
240  Send a message to a particular pid.
241 ****************************************************************************/
242
243 static NTSTATUS message_send_pid_internal(struct server_id pid, int msg_type,
244                                           const void *buf, size_t len,
245                                           BOOL duplicates_allowed,
246                                           unsigned int timeout)
247 {
248         TDB_DATA kbuf;
249         TDB_DATA dbuf;
250         TDB_DATA old_dbuf;
251         struct message_rec rec;
252         uint8 *ptr;
253         struct message_rec prec;
254
255         /* NULL pointer means implicit length zero. */
256         if (!buf) {
257                 SMB_ASSERT(len == 0);
258         }
259
260         /*
261          * Doing kill with a non-positive pid causes messages to be
262          * sent to places we don't want.
263          */
264
265         SMB_ASSERT(procid_to_pid(&pid) > 0);
266
267         rec.msg_version = MESSAGE_VERSION;
268         rec.msg_type = msg_type;
269         rec.dest = pid;
270         rec.src = procid_self();
271         rec.len = buf ? len : 0;
272
273         kbuf = message_key_pid(pid);
274
275         dbuf.dptr = (uint8 *)SMB_MALLOC(len + sizeof(rec));
276         if (!dbuf.dptr) {
277                 return NT_STATUS_NO_MEMORY;
278         }
279
280         memcpy(dbuf.dptr, &rec, sizeof(rec));
281         if (len > 0 && buf)
282                 memcpy((void *)((char*)dbuf.dptr+sizeof(rec)), buf, len);
283
284         dbuf.dsize = len + sizeof(rec);
285
286         if (duplicates_allowed) {
287
288                 /* If duplicates are allowed we can just append the message and return. */
289
290                 /* lock the record for the destination */
291                 if (timeout) {
292                         if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) {
293                                 DEBUG(0,("message_send_pid_internal: failed to get "
294                                          "chainlock with timeout %ul.\n", timeout));
295                                 return NT_STATUS_IO_TIMEOUT;
296                         }
297                 } else {
298                         if (tdb_chainlock(tdb, kbuf) == -1) {
299                                 DEBUG(0,("message_send_pid_internal: failed to get "
300                                          "chainlock.\n"));
301                                 return NT_STATUS_LOCK_NOT_GRANTED;
302                         }
303                 }       
304                 tdb_append(tdb, kbuf, dbuf);
305                 tdb_chainunlock(tdb, kbuf);
306
307                 SAFE_FREE(dbuf.dptr);
308                 errno = 0;                    /* paranoia */
309                 return message_notify(pid);
310         }
311
312         /* lock the record for the destination */
313         if (timeout) {
314                 if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) {
315                         DEBUG(0,("message_send_pid_internal: failed to get chainlock "
316                                  "with timeout %ul.\n", timeout));
317                         return NT_STATUS_IO_TIMEOUT;
318                 }
319         } else {
320                 if (tdb_chainlock(tdb, kbuf) == -1) {
321                         DEBUG(0,("message_send_pid_internal: failed to get "
322                                  "chainlock.\n"));
323                         return NT_STATUS_LOCK_NOT_GRANTED;
324                 }
325         }       
326
327         old_dbuf = tdb_fetch(tdb, kbuf);
328
329         if (!old_dbuf.dptr) {
330                 /* its a new record */
331
332                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
333                 tdb_chainunlock(tdb, kbuf);
334
335                 SAFE_FREE(dbuf.dptr);
336                 errno = 0;                    /* paranoia */
337                 return message_notify(pid);
338         }
339
340         /* Not a new record. Check for duplicates. */
341
342         for(ptr = old_dbuf.dptr; ptr < old_dbuf.dptr + old_dbuf.dsize; ) {
343                 /*
344                  * First check if the message header matches, then, if it's a non-zero
345                  * sized message, check if the data matches. If so it's a duplicate and
346                  * we can discard it. JRA.
347                  */
348
349                 if (!memcmp(ptr, &rec, sizeof(rec))) {
350                         if (!len || (len && !memcmp( ptr + sizeof(rec), buf, len))) {
351                                 tdb_chainunlock(tdb, kbuf);
352                                 DEBUG(10,("message_send_pid_internal: discarding "
353                                           "duplicate message.\n"));
354                                 SAFE_FREE(dbuf.dptr);
355                                 SAFE_FREE(old_dbuf.dptr);
356                                 return NT_STATUS_OK;
357                         }
358                 }
359                 memcpy(&prec, ptr, sizeof(prec));
360                 ptr += sizeof(rec) + prec.len;
361         }
362
363         /* we're adding to an existing entry */
364
365         tdb_append(tdb, kbuf, dbuf);
366         tdb_chainunlock(tdb, kbuf);
367
368         SAFE_FREE(old_dbuf.dptr);
369         SAFE_FREE(dbuf.dptr);
370
371         errno = 0;                    /* paranoia */
372         return message_notify(pid);
373 }
374
375 /****************************************************************************
376  Send a message to a particular pid - no timeout.
377 ****************************************************************************/
378
379 static NTSTATUS message_send_pid(struct server_id pid, int msg_type,
380                                  const void *buf, size_t len,
381                                  BOOL duplicates_allowed)
382 {
383         return message_send_pid_internal(pid, msg_type, buf, len,
384                                          duplicates_allowed, 0);
385 }
386
387 /****************************************************************************
388  Count the messages pending for a particular pid. Expensive....
389 ****************************************************************************/
390
391 unsigned int messages_pending_for_pid(struct server_id pid)
392 {
393         TDB_DATA kbuf;
394         TDB_DATA dbuf;
395         uint8 *buf;
396         unsigned int message_count = 0;
397
398         kbuf = message_key_pid(pid);
399
400         dbuf = tdb_fetch(tdb, kbuf);
401         if (dbuf.dptr == NULL || dbuf.dsize == 0) {
402                 SAFE_FREE(dbuf.dptr);
403                 return 0;
404         }
405
406         for (buf = dbuf.dptr; dbuf.dsize > sizeof(struct message_rec);) {
407                 struct message_rec rec;
408                 memcpy(&rec, buf, sizeof(rec));
409                 buf += (sizeof(rec) + rec.len);
410                 dbuf.dsize -= (sizeof(rec) + rec.len);
411                 message_count++;
412         }
413
414         SAFE_FREE(dbuf.dptr);
415         return message_count;
416 }
417
418 /****************************************************************************
419  Retrieve all messages for the current process.
420 ****************************************************************************/
421
422 static BOOL retrieve_all_messages(char **msgs_buf, size_t *total_len)
423 {
424         TDB_DATA kbuf;
425         TDB_DATA dbuf;
426         TDB_DATA null_dbuf;
427
428         ZERO_STRUCT(null_dbuf);
429
430         *msgs_buf = NULL;
431         *total_len = 0;
432
433         kbuf = message_key_pid(pid_to_procid(sys_getpid()));
434
435         if (tdb_chainlock(tdb, kbuf) == -1)
436                 return False;
437
438         dbuf = tdb_fetch(tdb, kbuf);
439         /*
440          * Replace with an empty record to keep the allocated
441          * space in the tdb.
442          */
443         tdb_store(tdb, kbuf, null_dbuf, TDB_REPLACE);
444         tdb_chainunlock(tdb, kbuf);
445
446         if (dbuf.dptr == NULL || dbuf.dsize == 0) {
447                 SAFE_FREE(dbuf.dptr);
448                 return False;
449         }
450
451         *msgs_buf = (char *)dbuf.dptr;
452         *total_len = dbuf.dsize;
453
454         return True;
455 }
456
457 /****************************************************************************
458  Parse out the next message for the current process.
459 ****************************************************************************/
460
461 static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type,
462                          struct server_id *src, char **buf, size_t *len)
463 {
464         struct message_rec rec;
465         char *ret_buf = *buf;
466
467         *buf = NULL;
468         *len = 0;
469
470         if (total_len - (ret_buf - msgs_buf) < sizeof(rec))
471                 return False;
472
473         memcpy(&rec, ret_buf, sizeof(rec));
474         ret_buf += sizeof(rec);
475
476         if (rec.msg_version != MESSAGE_VERSION) {
477                 DEBUG(0,("message version %d received (expected %d)\n", rec.msg_version, MESSAGE_VERSION));
478                 return False;
479         }
480
481         if (rec.len > 0) {
482                 if (total_len - (ret_buf - msgs_buf) < rec.len)
483                         return False;
484         }
485
486         *len = rec.len;
487         *msg_type = rec.msg_type;
488         *src = rec.src;
489         *buf = ret_buf;
490
491         return True;
492 }
493
494 /****************************************************************************
495  Receive and dispatch any messages pending for this process.
496  JRA changed Dec 13 2006. Only one message handler now permitted per type.
497  *NOTE*: Dispatch functions must be able to cope with incoming
498  messages on an *odd* byte boundary.
499 ****************************************************************************/
500
501 void message_dispatch(void)
502 {
503         int msg_type;
504         struct server_id src;
505         char *buf;
506         char *msgs_buf;
507         size_t len, total_len;
508         int n_handled;
509
510         if (!received_signal)
511                 return;
512
513         DEBUG(10,("message_dispatch: received_signal = %d\n", received_signal));
514
515         received_signal = 0;
516
517         if (!retrieve_all_messages(&msgs_buf, &total_len))
518                 return;
519
520         for (buf = msgs_buf; message_recv(msgs_buf, total_len, &msg_type, &src, &buf, &len); buf += len) {
521                 struct dispatch_fns *dfn;
522
523                 DEBUG(10,("message_dispatch: received msg_type=%d "
524                           "src_pid=%u\n", msg_type,
525                           (unsigned int) procid_to_pid(&src)));
526
527                 n_handled = 0;
528                 for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
529                         if (dfn->msg_type == msg_type) {
530                                 DEBUG(10,("message_dispatch: processing message of type %d.\n", msg_type));
531                                 dfn->fn(msg_type, src,
532                                         len ? (void *)buf : NULL, len,
533                                         dfn->private_data);
534                                 n_handled++;
535                                 break;
536                         }
537                 }
538                 if (!n_handled) {
539                         DEBUG(5,("message_dispatch: warning: no handler registed for "
540                                  "msg_type %d in pid %u\n",
541                                  msg_type, (unsigned int)sys_getpid()));
542                 }
543         }
544         SAFE_FREE(msgs_buf);
545 }
546
547 /****************************************************************************
548  Register/replace a dispatch function for a particular message type.
549  JRA changed Dec 13 2006. Only one message handler now permitted per type.
550  *NOTE*: Dispatch functions must be able to cope with incoming
551  messages on an *odd* byte boundary.
552 ****************************************************************************/
553
554 static void message_register(int msg_type, 
555                              void (*fn)(int msg_type, struct server_id pid,
556                                         void *buf, size_t len,
557                                         void *private_data),
558                              void *private_data)
559 {
560         struct dispatch_fns *dfn;
561
562         for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
563                 if (dfn->msg_type == msg_type) {
564                         dfn->fn = fn;
565                         return;
566                 }
567         }
568
569         dfn = SMB_MALLOC_P(struct dispatch_fns);
570
571         if (dfn != NULL) {
572
573                 ZERO_STRUCTPN(dfn);
574
575                 dfn->msg_type = msg_type;
576                 dfn->fn = fn;
577                 dfn->private_data = private_data;
578
579                 DLIST_ADD(dispatch_fns, dfn);
580         }
581         else {
582         
583                 DEBUG(0,("message_register: Not enough memory. malloc failed!\n"));
584         }
585 }
586
587 /****************************************************************************
588  De-register the function for a particular message type.
589 ****************************************************************************/
590
591 static void message_deregister(int msg_type)
592 {
593         struct dispatch_fns *dfn, *next;
594
595         for (dfn = dispatch_fns; dfn; dfn = next) {
596                 next = dfn->next;
597                 if (dfn->msg_type == msg_type) {
598                         DLIST_REMOVE(dispatch_fns, dfn);
599                         SAFE_FREE(dfn);
600                         return;
601                 }
602         }       
603 }
604
605 struct msg_all {
606         int msg_type;
607         uint32 msg_flag;
608         const void *buf;
609         size_t len;
610         BOOL duplicates;
611         int n_sent;
612 };
613
614 /****************************************************************************
615  Send one of the messages for the broadcast.
616 ****************************************************************************/
617
618 static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void *state)
619 {
620         struct connections_data crec;
621         struct msg_all *msg_all = (struct msg_all *)state;
622         NTSTATUS status;
623
624         if (dbuf.dsize != sizeof(crec))
625                 return 0;
626
627         memcpy(&crec, dbuf.dptr, sizeof(crec));
628
629         if (crec.cnum != -1)
630                 return 0;
631
632         /* Don't send if the receiver hasn't registered an interest. */
633
634         if(!(crec.bcast_msg_flags & msg_all->msg_flag))
635                 return 0;
636
637         /* If the msg send fails because the pid was not found (i.e. smbd died), 
638          * the msg has already been deleted from the messages.tdb.*/
639
640         status = message_send_pid(crec.pid, msg_all->msg_type,
641                                   msg_all->buf, msg_all->len,
642                                   msg_all->duplicates);
643
644         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
645                 
646                 /* If the pid was not found delete the entry from connections.tdb */
647
648                 DEBUG(2,("pid %s doesn't exist - deleting connections %d [%s]\n",
649                          procid_str_static(&crec.pid), crec.cnum, crec.servicename));
650                 tdb_delete(the_tdb, kbuf);
651         }
652         msg_all->n_sent++;
653         return 0;
654 }
655
656 /**
657  * Send a message to all smbd processes.
658  *
659  * It isn't very efficient, but should be OK for the sorts of
660  * applications that use it. When we need efficient broadcast we can add
661  * it.
662  *
663  * @param n_sent Set to the number of messages sent.  This should be
664  * equal to the number of processes, but be careful for races.
665  *
666  * @retval True for success.
667  **/
668 BOOL message_send_all(struct messaging_context *msg_ctx,
669                       int msg_type,
670                       const void *buf, size_t len,
671                       BOOL duplicates_allowed,
672                       int *n_sent)
673 {
674         struct msg_all msg_all;
675
676         msg_all.msg_type = msg_type;
677         if (msg_type < 1000)
678                 msg_all.msg_flag = FLAG_MSG_GENERAL;
679         else if (msg_type > 1000 && msg_type < 2000)
680                 msg_all.msg_flag = FLAG_MSG_NMBD;
681         else if (msg_type > 2000 && msg_type < 2100)
682                 msg_all.msg_flag = FLAG_MSG_PRINT_NOTIFY;
683         else if (msg_type > 2100 && msg_type < 3000)
684                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
685         else if (msg_type > 3000 && msg_type < 4000)
686                 msg_all.msg_flag = FLAG_MSG_SMBD;
687         else
688                 return False;
689
690         msg_all.buf = buf;
691         msg_all.len = len;
692         msg_all.duplicates = duplicates_allowed;
693         msg_all.n_sent = 0;
694
695         connections_traverse(traverse_fn, &msg_all);
696         if (n_sent)
697                 *n_sent = msg_all.n_sent;
698         return True;
699 }
700
701 /*
702  * Block and unblock receiving of messages. Allows removal of race conditions
703  * when doing a fork and changing message disposition.
704  */
705
706 void message_block(void)
707 {
708         BlockSignals(True, SIGUSR1);
709 }
710
711 void message_unblock(void)
712 {
713         BlockSignals(False, SIGUSR1);
714 }
715
716 /*
717  * Samba4 API wrapper around the Samba3 implementation. Yes, I know, we could
718  * import the whole Samba4 thing, but I want notify.c from Samba4 in first.
719  */
720
721 struct messaging_callback {
722         struct messaging_callback *prev, *next;
723         uint32 msg_type;
724         void (*fn)(struct messaging_context *msg, void *private_data, 
725                    uint32_t msg_type, 
726                    struct server_id server_id, DATA_BLOB *data);
727         void *private_data;
728 };
729
730 struct messaging_context {
731         struct server_id id;
732         struct event_context *event_ctx;
733         struct messaging_callback *callbacks;
734 };
735
736 static int messaging_context_destructor(struct messaging_context *ctx)
737 {
738         struct messaging_callback *cb;
739
740         for (cb = ctx->callbacks; cb; cb = cb->next) {
741                 /*
742                  * We unconditionally remove all instances of our callback
743                  * from the tdb basis.
744                  */
745                 message_deregister(cb->msg_type);
746         }
747         return 0;
748 }
749
750 struct event_context *messaging_event_context(struct messaging_context *msg_ctx)
751 {
752         return msg_ctx->event_ctx;
753 }
754
755 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
756                                          struct server_id server_id, 
757                                          struct event_context *ev)
758 {
759         struct messaging_context *ctx;
760
761         if (!(ctx = TALLOC_ZERO_P(mem_ctx, struct messaging_context))) {
762                 return NULL;
763         }
764
765         ctx->id = server_id;
766         ctx->event_ctx = ev;
767         talloc_set_destructor(ctx, messaging_context_destructor);
768
769         if (!message_init(ctx)) {
770                 DEBUG(0, ("message_init failed: %s\n", strerror(errno)));
771                 TALLOC_FREE(ctx);
772         }
773
774         return ctx;
775 }
776
777 static void messaging_callback(int msg_type, struct server_id pid,
778                                void *buf, size_t len, void *private_data)
779 {
780         struct messaging_context *ctx = talloc_get_type_abort(
781                 private_data, struct messaging_context);
782         struct messaging_callback *cb, *next;
783
784         for (cb = ctx->callbacks; cb; cb = next) {
785                 /*
786                  * Allow a callback to remove itself
787                  */
788                 next = cb->next;
789
790                 if (msg_type == cb->msg_type) {
791                         DATA_BLOB blob;
792
793                         blob.data = (uint8 *)buf;
794                         blob.length = len;
795
796                         cb->fn(ctx, cb->private_data, msg_type, pid, &blob);
797                 }
798         }
799 }
800
801 /*
802  * Register a dispatch function for a particular message type. Allow multiple
803  * registrants
804 */
805 NTSTATUS messaging_register(struct messaging_context *ctx, void *private_data,
806                             uint32_t msg_type,
807                             void (*fn)(struct messaging_context *msg,
808                                        void *private_data, 
809                                        uint32_t msg_type, 
810                                        struct server_id server_id,
811                                        DATA_BLOB *data))
812 {
813         struct messaging_callback *cb;
814
815         if (!(cb = talloc(ctx, struct messaging_callback))) {
816                 return NT_STATUS_NO_MEMORY;
817         }
818
819         cb->msg_type = msg_type;
820         cb->fn = fn;
821         cb->private_data = private_data;
822
823         DLIST_ADD(ctx->callbacks, cb);
824         message_register(msg_type, messaging_callback, ctx);
825         return NT_STATUS_OK;
826 }
827
828 /*
829   De-register the function for a particular message type.
830 */
831 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
832                           void *private_data)
833 {
834         struct messaging_callback *cb, *next;
835
836         for (cb = ctx->callbacks; cb; cb = next) {
837                 next = cb->next;
838                 if ((cb->msg_type == msg_type)
839                     && (cb->private_data == private_data)) {
840                         DLIST_REMOVE(ctx->callbacks, cb);
841                         TALLOC_FREE(cb);
842                 }
843         }
844 }
845
846 /*
847   Send a message to a particular server
848 */
849 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
850                         struct server_id server, 
851                         uint32_t msg_type, const DATA_BLOB *data)
852 {
853         return message_send_pid_internal(server, msg_type, data->data,
854                                          data->length, True, 0);
855 }
856
857 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
858                             struct server_id server, uint32_t msg_type,
859                             const uint8 *buf, size_t len)
860 {
861         DATA_BLOB blob = data_blob_const(buf, len);
862         return messaging_send(msg_ctx, server, msg_type, &blob);
863 }
864
865 NTSTATUS messaging_send_buf_with_timeout(struct messaging_context *msg_ctx,
866                                          struct server_id server,
867                                          uint32_t msg_type,
868                                          const uint8 *buf, size_t len,
869                                          int timeout)
870 {
871         return message_send_pid_internal(server, msg_type, buf, len,
872                                          True, timeout);
873 }
874
875 /** @} **/