r23022: Reformatting
[ira/wip.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 2 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27   
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49
50 /* the locking database handle */
51 static TDB_CONTEXT *tdb;
52 static int received_signal;
53
54 /* change the message version with any incompatible changes in the protocol */
55 #define MESSAGE_VERSION 1
56
57 struct message_rec {
58         int msg_version;
59         int msg_type;
60         struct server_id dest;
61         struct server_id src;
62         size_t len;
63 };
64
65 /* we have a linked list of dispatch handlers */
66 static struct dispatch_fns {
67         struct dispatch_fns *next, *prev;
68         int msg_type;
69         void (*fn)(int msg_type, struct server_id pid, void *buf, size_t len,
70                    void *private_data);
71         void *private_data;
72 } *dispatch_fns;
73
74 static void message_register(int msg_type, 
75                              void (*fn)(int msg_type, struct server_id pid,
76                                         void *buf, size_t len,
77                                         void *private_data),
78                              void *private_data);
79
80 /****************************************************************************
81  Free global objects.
82 ****************************************************************************/
83
84 void gfree_messages(void)
85 {
86         struct dispatch_fns *dfn, *next;
87
88         /* delete the dispatch_fns list */
89         dfn = dispatch_fns;
90         while( dfn ) {
91                 next = dfn->next;
92                 DLIST_REMOVE(dispatch_fns, dfn);
93                 SAFE_FREE(dfn);
94                 dfn = next;
95         }
96 }
97
98 /****************************************************************************
99  Notifications come in as signals.
100 ****************************************************************************/
101
102 static void sig_usr1(void)
103 {
104         received_signal = 1;
105         sys_select_signal(SIGUSR1);
106 }
107
108 static NTSTATUS message_send_pid(struct server_id pid, int msg_type,
109                                  const void *buf, size_t len,
110                                  BOOL duplicates_allowed);
111
112 /****************************************************************************
113  A useful function for testing the message system.
114 ****************************************************************************/
115
116 static void ping_message(int msg_type, struct server_id src,
117                          void *buf, size_t len, void *private_data)
118 {
119         const char *msg = buf ? (const char *)buf : "none";
120
121         DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
122                  procid_str_static(&src), msg));
123         message_send_pid(src, MSG_PONG, buf, len, True);
124 }
125
126 /****************************************************************************
127  Initialise the messaging functions. 
128 ****************************************************************************/
129
130 static BOOL message_init(struct messaging_context *msg_ctx)
131 {
132         sec_init();
133
134         if (tdb)
135                 return True;
136
137         tdb = tdb_open_log(lock_path("messages.tdb"), 
138                        0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, 
139                        O_RDWR|O_CREAT,0600);
140
141         if (!tdb) {
142                 DEBUG(0,("ERROR: Failed to initialise messages database\n"));
143                 return False;
144         }
145
146         /* Activate the per-hashchain freelist */
147         tdb_set_max_dead(tdb, 5);
148
149         CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
150
151         message_register(MSG_PING, ping_message, NULL);
152
153         /* Register some debugging related messages */
154
155         register_msg_pool_usage(msg_ctx);
156         register_dmalloc_msgs(msg_ctx);
157         debug_register_msgs(msg_ctx);
158
159         return True;
160 }
161
162 /*******************************************************************
163  Form a static tdb key from a pid.
164 ******************************************************************/
165
166 static TDB_DATA message_key_pid(struct server_id pid)
167 {
168         static char key[20];
169         TDB_DATA kbuf;
170
171         slprintf(key, sizeof(key)-1, "PID/%s", procid_str_static(&pid));
172         
173         kbuf.dptr = (uint8 *)key;
174         kbuf.dsize = strlen(key)+1;
175         return kbuf;
176 }
177
178 /****************************************************************************
179  Notify a process that it has a message. If the process doesn't exist 
180  then delete its record in the database.
181 ****************************************************************************/
182
183 static NTSTATUS message_notify(struct server_id procid)
184 {
185         pid_t pid = procid.pid;
186         int ret;
187         uid_t euid = geteuid();
188
189         /*
190          * Doing kill with a non-positive pid causes messages to be
191          * sent to places we don't want.
192          */
193
194         SMB_ASSERT(pid > 0);
195
196         if (euid != 0) {
197                 /* If we're not root become so to send the message. */
198                 save_re_uid();
199                 set_effective_uid(0);
200         }
201
202         ret = kill(pid, SIGUSR1);
203
204         if (euid != 0) {
205                 /* Go back to who we were. */
206                 int saved_errno = errno;
207                 restore_re_uid_fromroot();
208                 errno = saved_errno;
209         }
210
211         if (ret == -1) {
212                 if (errno == ESRCH) {
213                         DEBUG(2,("pid %d doesn't exist - deleting messages record\n",
214                                  (int)pid));
215                         tdb_delete(tdb, message_key_pid(procid));
216
217                         /*
218                          * INVALID_HANDLE is the closest I can think of -- vl
219                          */
220                         return NT_STATUS_INVALID_HANDLE;
221                 }
222
223                 DEBUG(2,("message to process %d failed - %s\n", (int)pid,
224                          strerror(errno)));
225
226                 /*
227                  * No call to map_nt_error_from_unix -- don't want to link in
228                  * errormap.o into lots of utils.
229                  */
230
231                 if (errno == EINVAL) return NT_STATUS_INVALID_PARAMETER;
232                 if (errno == EPERM)  return NT_STATUS_ACCESS_DENIED;
233                 return NT_STATUS_UNSUCCESSFUL;
234         }
235
236         return NT_STATUS_OK;
237 }
238
239 /****************************************************************************
240  Send a message to a particular pid.
241 ****************************************************************************/
242
243 static NTSTATUS message_send_pid_internal(struct server_id pid, int msg_type,
244                                           const void *buf, size_t len,
245                                           BOOL duplicates_allowed,
246                                           unsigned int timeout)
247 {
248         TDB_DATA kbuf;
249         TDB_DATA dbuf;
250         TDB_DATA old_dbuf;
251         struct message_rec rec;
252         uint8 *ptr;
253         struct message_rec prec;
254
255         /* NULL pointer means implicit length zero. */
256         if (!buf) {
257                 SMB_ASSERT(len == 0);
258         }
259
260         /*
261          * Doing kill with a non-positive pid causes messages to be
262          * sent to places we don't want.
263          */
264
265         SMB_ASSERT(procid_to_pid(&pid) > 0);
266
267         rec.msg_version = MESSAGE_VERSION;
268         rec.msg_type = msg_type;
269         rec.dest = pid;
270         rec.src = procid_self();
271         rec.len = buf ? len : 0;
272
273         kbuf = message_key_pid(pid);
274
275         dbuf.dptr = (uint8 *)SMB_MALLOC(len + sizeof(rec));
276         if (!dbuf.dptr) {
277                 return NT_STATUS_NO_MEMORY;
278         }
279
280         memcpy(dbuf.dptr, &rec, sizeof(rec));
281         if (len > 0 && buf)
282                 memcpy((void *)((char*)dbuf.dptr+sizeof(rec)), buf, len);
283
284         dbuf.dsize = len + sizeof(rec);
285
286         if (duplicates_allowed) {
287
288                 /* If duplicates are allowed we can just append the message
289                  * and return. */
290
291                 /* lock the record for the destination */
292                 if (timeout) {
293                         if (tdb_chainlock_with_timeout(tdb, kbuf,
294                                                        timeout) == -1) {
295                                 DEBUG(0,("message_send_pid_internal: failed "
296                                          "to get chainlock with timeout "
297                                          "%ul.\n", timeout));
298                                 return NT_STATUS_IO_TIMEOUT;
299                         }
300                 } else {
301                         if (tdb_chainlock(tdb, kbuf) == -1) {
302                                 DEBUG(0,("message_send_pid_internal: failed "
303                                          "to get chainlock.\n"));
304                                 return NT_STATUS_LOCK_NOT_GRANTED;
305                         }
306                 }       
307                 tdb_append(tdb, kbuf, dbuf);
308                 tdb_chainunlock(tdb, kbuf);
309
310                 SAFE_FREE(dbuf.dptr);
311                 errno = 0;                    /* paranoia */
312                 return message_notify(pid);
313         }
314
315         /* lock the record for the destination */
316         if (timeout) {
317                 if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) {
318                         DEBUG(0,("message_send_pid_internal: failed to get "
319                                  "chainlock with timeout %ul.\n", timeout));
320                         return NT_STATUS_IO_TIMEOUT;
321                 }
322         } else {
323                 if (tdb_chainlock(tdb, kbuf) == -1) {
324                         DEBUG(0,("message_send_pid_internal: failed to get "
325                                  "chainlock.\n"));
326                         return NT_STATUS_LOCK_NOT_GRANTED;
327                 }
328         }       
329
330         old_dbuf = tdb_fetch(tdb, kbuf);
331
332         if (!old_dbuf.dptr) {
333                 /* its a new record */
334
335                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
336                 tdb_chainunlock(tdb, kbuf);
337
338                 SAFE_FREE(dbuf.dptr);
339                 errno = 0;                    /* paranoia */
340                 return message_notify(pid);
341         }
342
343         /* Not a new record. Check for duplicates. */
344
345         for(ptr = old_dbuf.dptr; ptr < old_dbuf.dptr + old_dbuf.dsize; ) {
346                 /*
347                  * First check if the message header matches, then, if it's a
348                  * non-zero sized message, check if the data matches. If so
349                  * it's a duplicate and we can discard it. JRA.
350                  */
351
352                 if (!memcmp(ptr, &rec, sizeof(rec))) {
353                         if (!len
354                             || (len
355                                 && !memcmp( ptr + sizeof(rec), buf, len))) {
356                                 tdb_chainunlock(tdb, kbuf);
357                                 DEBUG(10,("message_send_pid_internal: "
358                                           "discarding duplicate message.\n"));
359                                 SAFE_FREE(dbuf.dptr);
360                                 SAFE_FREE(old_dbuf.dptr);
361                                 return NT_STATUS_OK;
362                         }
363                 }
364                 memcpy(&prec, ptr, sizeof(prec));
365                 ptr += sizeof(rec) + prec.len;
366         }
367
368         /* we're adding to an existing entry */
369
370         tdb_append(tdb, kbuf, dbuf);
371         tdb_chainunlock(tdb, kbuf);
372
373         SAFE_FREE(old_dbuf.dptr);
374         SAFE_FREE(dbuf.dptr);
375
376         errno = 0;                    /* paranoia */
377         return message_notify(pid);
378 }
379
380 /****************************************************************************
381  Send a message to a particular pid - no timeout.
382 ****************************************************************************/
383
384 static NTSTATUS message_send_pid(struct server_id pid, int msg_type,
385                                  const void *buf, size_t len,
386                                  BOOL duplicates_allowed)
387 {
388         return message_send_pid_internal(pid, msg_type, buf, len,
389                                          duplicates_allowed, 0);
390 }
391
392 /****************************************************************************
393  Count the messages pending for a particular pid. Expensive....
394 ****************************************************************************/
395
396 unsigned int messages_pending_for_pid(struct server_id pid)
397 {
398         TDB_DATA kbuf;
399         TDB_DATA dbuf;
400         uint8 *buf;
401         unsigned int message_count = 0;
402
403         kbuf = message_key_pid(pid);
404
405         dbuf = tdb_fetch(tdb, kbuf);
406         if (dbuf.dptr == NULL || dbuf.dsize == 0) {
407                 SAFE_FREE(dbuf.dptr);
408                 return 0;
409         }
410
411         for (buf = dbuf.dptr; dbuf.dsize > sizeof(struct message_rec);) {
412                 struct message_rec rec;
413                 memcpy(&rec, buf, sizeof(rec));
414                 buf += (sizeof(rec) + rec.len);
415                 dbuf.dsize -= (sizeof(rec) + rec.len);
416                 message_count++;
417         }
418
419         SAFE_FREE(dbuf.dptr);
420         return message_count;
421 }
422
423 /****************************************************************************
424  Retrieve all messages for the current process.
425 ****************************************************************************/
426
427 static BOOL retrieve_all_messages(char **msgs_buf, size_t *total_len)
428 {
429         TDB_DATA kbuf;
430         TDB_DATA dbuf;
431         TDB_DATA null_dbuf;
432
433         ZERO_STRUCT(null_dbuf);
434
435         *msgs_buf = NULL;
436         *total_len = 0;
437
438         kbuf = message_key_pid(pid_to_procid(sys_getpid()));
439
440         if (tdb_chainlock(tdb, kbuf) == -1)
441                 return False;
442
443         dbuf = tdb_fetch(tdb, kbuf);
444         /*
445          * Replace with an empty record to keep the allocated
446          * space in the tdb.
447          */
448         tdb_store(tdb, kbuf, null_dbuf, TDB_REPLACE);
449         tdb_chainunlock(tdb, kbuf);
450
451         if (dbuf.dptr == NULL || dbuf.dsize == 0) {
452                 SAFE_FREE(dbuf.dptr);
453                 return False;
454         }
455
456         *msgs_buf = (char *)dbuf.dptr;
457         *total_len = dbuf.dsize;
458
459         return True;
460 }
461
462 /****************************************************************************
463  Parse out the next message for the current process.
464 ****************************************************************************/
465
466 static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type,
467                          struct server_id *src, char **buf, size_t *len)
468 {
469         struct message_rec rec;
470         char *ret_buf = *buf;
471
472         *buf = NULL;
473         *len = 0;
474
475         if (total_len - (ret_buf - msgs_buf) < sizeof(rec))
476                 return False;
477
478         memcpy(&rec, ret_buf, sizeof(rec));
479         ret_buf += sizeof(rec);
480
481         if (rec.msg_version != MESSAGE_VERSION) {
482                 DEBUG(0,("message version %d received (expected %d)\n",
483                          rec.msg_version, MESSAGE_VERSION));
484                 return False;
485         }
486
487         if (rec.len > 0) {
488                 if (total_len - (ret_buf - msgs_buf) < rec.len)
489                         return False;
490         }
491
492         *len = rec.len;
493         *msg_type = rec.msg_type;
494         *src = rec.src;
495         *buf = ret_buf;
496
497         return True;
498 }
499
500 /****************************************************************************
501  Receive and dispatch any messages pending for this process.
502  JRA changed Dec 13 2006. Only one message handler now permitted per type.
503  *NOTE*: Dispatch functions must be able to cope with incoming
504  messages on an *odd* byte boundary.
505 ****************************************************************************/
506
507 void message_dispatch(void)
508 {
509         int msg_type;
510         struct server_id src;
511         char *buf;
512         char *msgs_buf;
513         size_t len, total_len;
514         int n_handled;
515
516         if (!received_signal)
517                 return;
518
519         DEBUG(10, ("message_dispatch: received_signal = %d\n",
520                    received_signal));
521
522         received_signal = 0;
523
524         if (!retrieve_all_messages(&msgs_buf, &total_len))
525                 return;
526
527         for (buf = msgs_buf;
528              message_recv(msgs_buf, total_len, &msg_type, &src, &buf, &len);
529              buf += len) {
530                 struct dispatch_fns *dfn;
531
532                 DEBUG(10,("message_dispatch: received msg_type=%d "
533                           "src_pid=%u\n", msg_type,
534                           (unsigned int) procid_to_pid(&src)));
535
536                 n_handled = 0;
537                 for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
538                         if (dfn->msg_type == msg_type) {
539                                 DEBUG(10,("message_dispatch: processing "
540                                           "message of type %d.\n", msg_type));
541                                 dfn->fn(msg_type, src,
542                                         len ? (void *)buf : NULL, len,
543                                         dfn->private_data);
544                                 n_handled++;
545                                 break;
546                         }
547                 }
548                 if (!n_handled) {
549                         DEBUG(5,("message_dispatch: warning: no handler "
550                                  "registed for msg_type %d in pid %u\n",
551                                  msg_type, (unsigned int)sys_getpid()));
552                 }
553         }
554         SAFE_FREE(msgs_buf);
555 }
556
557 /****************************************************************************
558  Register/replace a dispatch function for a particular message type.
559  JRA changed Dec 13 2006. Only one message handler now permitted per type.
560  *NOTE*: Dispatch functions must be able to cope with incoming
561  messages on an *odd* byte boundary.
562 ****************************************************************************/
563
564 static void message_register(int msg_type, 
565                              void (*fn)(int msg_type, struct server_id pid,
566                                         void *buf, size_t len,
567                                         void *private_data),
568                              void *private_data)
569 {
570         struct dispatch_fns *dfn;
571
572         for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
573                 if (dfn->msg_type == msg_type) {
574                         dfn->fn = fn;
575                         return;
576                 }
577         }
578
579         if (!(dfn = SMB_MALLOC_P(struct dispatch_fns))) {
580                 DEBUG(0,("message_register: Not enough memory. malloc "
581                          "failed!\n"));
582                 return;
583         }
584
585         ZERO_STRUCTPN(dfn);
586
587         dfn->msg_type = msg_type;
588         dfn->fn = fn;
589         dfn->private_data = private_data;
590
591         DLIST_ADD(dispatch_fns, dfn);
592 }
593
594 /****************************************************************************
595  De-register the function for a particular message type.
596 ****************************************************************************/
597
598 static void message_deregister(int msg_type)
599 {
600         struct dispatch_fns *dfn, *next;
601
602         for (dfn = dispatch_fns; dfn; dfn = next) {
603                 next = dfn->next;
604                 if (dfn->msg_type == msg_type) {
605                         DLIST_REMOVE(dispatch_fns, dfn);
606                         SAFE_FREE(dfn);
607                         return;
608                 }
609         }       
610 }
611
612 struct msg_all {
613         int msg_type;
614         uint32 msg_flag;
615         const void *buf;
616         size_t len;
617         BOOL duplicates;
618         int n_sent;
619 };
620
621 /****************************************************************************
622  Send one of the messages for the broadcast.
623 ****************************************************************************/
624
625 static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf,
626                        void *state)
627 {
628         struct connections_data crec;
629         struct msg_all *msg_all = (struct msg_all *)state;
630         NTSTATUS status;
631
632         if (dbuf.dsize != sizeof(crec))
633                 return 0;
634
635         memcpy(&crec, dbuf.dptr, sizeof(crec));
636
637         if (crec.cnum != -1)
638                 return 0;
639
640         /* Don't send if the receiver hasn't registered an interest. */
641
642         if(!(crec.bcast_msg_flags & msg_all->msg_flag))
643                 return 0;
644
645         /* If the msg send fails because the pid was not found (i.e. smbd died), 
646          * the msg has already been deleted from the messages.tdb.*/
647
648         status = message_send_pid(crec.pid, msg_all->msg_type,
649                                   msg_all->buf, msg_all->len,
650                                   msg_all->duplicates);
651
652         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
653                 
654                 /* If the pid was not found delete the entry from
655                  * connections.tdb */
656
657                 DEBUG(2,("pid %s doesn't exist - deleting connections "
658                          "%d [%s]\n", procid_str_static(&crec.pid), crec.cnum,
659                          crec.servicename));
660                 tdb_delete(the_tdb, kbuf);
661         }
662         msg_all->n_sent++;
663         return 0;
664 }
665
666 /**
667  * Send a message to all smbd processes.
668  *
669  * It isn't very efficient, but should be OK for the sorts of
670  * applications that use it. When we need efficient broadcast we can add
671  * it.
672  *
673  * @param n_sent Set to the number of messages sent.  This should be
674  * equal to the number of processes, but be careful for races.
675  *
676  * @retval True for success.
677  **/
678 BOOL message_send_all(struct messaging_context *msg_ctx,
679                       int msg_type,
680                       const void *buf, size_t len,
681                       BOOL duplicates_allowed,
682                       int *n_sent)
683 {
684         struct msg_all msg_all;
685
686         msg_all.msg_type = msg_type;
687         if (msg_type < 1000)
688                 msg_all.msg_flag = FLAG_MSG_GENERAL;
689         else if (msg_type > 1000 && msg_type < 2000)
690                 msg_all.msg_flag = FLAG_MSG_NMBD;
691         else if (msg_type > 2000 && msg_type < 2100)
692                 msg_all.msg_flag = FLAG_MSG_PRINT_NOTIFY;
693         else if (msg_type > 2100 && msg_type < 3000)
694                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
695         else if (msg_type > 3000 && msg_type < 4000)
696                 msg_all.msg_flag = FLAG_MSG_SMBD;
697         else
698                 return False;
699
700         msg_all.buf = buf;
701         msg_all.len = len;
702         msg_all.duplicates = duplicates_allowed;
703         msg_all.n_sent = 0;
704
705         connections_traverse(traverse_fn, &msg_all);
706         if (n_sent)
707                 *n_sent = msg_all.n_sent;
708         return True;
709 }
710
711 /*
712  * Block and unblock receiving of messages. Allows removal of race conditions
713  * when doing a fork and changing message disposition.
714  */
715
716 void message_block(void)
717 {
718         BlockSignals(True, SIGUSR1);
719 }
720
721 void message_unblock(void)
722 {
723         BlockSignals(False, SIGUSR1);
724 }
725
726 /*
727  * Samba4 API wrapper around the Samba3 implementation. Yes, I know, we could
728  * import the whole Samba4 thing, but I want notify.c from Samba4 in first.
729  */
730
731 struct messaging_callback {
732         struct messaging_callback *prev, *next;
733         uint32 msg_type;
734         void (*fn)(struct messaging_context *msg, void *private_data, 
735                    uint32_t msg_type, 
736                    struct server_id server_id, DATA_BLOB *data);
737         void *private_data;
738 };
739
740 struct messaging_context {
741         struct server_id id;
742         struct event_context *event_ctx;
743         struct messaging_callback *callbacks;
744 };
745
746 static int messaging_context_destructor(struct messaging_context *ctx)
747 {
748         struct messaging_callback *cb;
749
750         for (cb = ctx->callbacks; cb; cb = cb->next) {
751                 /*
752                  * We unconditionally remove all instances of our callback
753                  * from the tdb basis.
754                  */
755                 message_deregister(cb->msg_type);
756         }
757         return 0;
758 }
759
760 struct event_context *messaging_event_context(struct messaging_context *msg_ctx)
761 {
762         return msg_ctx->event_ctx;
763 }
764
765 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
766                                          struct server_id server_id, 
767                                          struct event_context *ev)
768 {
769         struct messaging_context *ctx;
770
771         if (!(ctx = TALLOC_ZERO_P(mem_ctx, struct messaging_context))) {
772                 return NULL;
773         }
774
775         ctx->id = server_id;
776         ctx->event_ctx = ev;
777         talloc_set_destructor(ctx, messaging_context_destructor);
778
779         if (!message_init(ctx)) {
780                 DEBUG(0, ("message_init failed: %s\n", strerror(errno)));
781                 TALLOC_FREE(ctx);
782         }
783
784         return ctx;
785 }
786
787 static void messaging_callback(int msg_type, struct server_id pid,
788                                void *buf, size_t len, void *private_data)
789 {
790         struct messaging_context *ctx = talloc_get_type_abort(
791                 private_data, struct messaging_context);
792         struct messaging_callback *cb, *next;
793
794         for (cb = ctx->callbacks; cb; cb = next) {
795                 /*
796                  * Allow a callback to remove itself
797                  */
798                 next = cb->next;
799
800                 if (msg_type == cb->msg_type) {
801                         DATA_BLOB blob;
802
803                         blob.data = (uint8 *)buf;
804                         blob.length = len;
805
806                         cb->fn(ctx, cb->private_data, msg_type, pid, &blob);
807                 }
808         }
809 }
810
811 /*
812  * Register a dispatch function for a particular message type. Allow multiple
813  * registrants
814 */
815 NTSTATUS messaging_register(struct messaging_context *ctx, void *private_data,
816                             uint32_t msg_type,
817                             void (*fn)(struct messaging_context *msg,
818                                        void *private_data, 
819                                        uint32_t msg_type, 
820                                        struct server_id server_id,
821                                        DATA_BLOB *data))
822 {
823         struct messaging_callback *cb;
824
825         if (!(cb = talloc(ctx, struct messaging_callback))) {
826                 return NT_STATUS_NO_MEMORY;
827         }
828
829         cb->msg_type = msg_type;
830         cb->fn = fn;
831         cb->private_data = private_data;
832
833         DLIST_ADD(ctx->callbacks, cb);
834         message_register(msg_type, messaging_callback, ctx);
835         return NT_STATUS_OK;
836 }
837
838 /*
839   De-register the function for a particular message type.
840 */
841 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
842                           void *private_data)
843 {
844         struct messaging_callback *cb, *next;
845
846         for (cb = ctx->callbacks; cb; cb = next) {
847                 next = cb->next;
848                 if ((cb->msg_type == msg_type)
849                     && (cb->private_data == private_data)) {
850                         DLIST_REMOVE(ctx->callbacks, cb);
851                         TALLOC_FREE(cb);
852                 }
853         }
854 }
855
856 /*
857   Send a message to a particular server
858 */
859 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
860                         struct server_id server, 
861                         uint32_t msg_type, const DATA_BLOB *data)
862 {
863         return message_send_pid_internal(server, msg_type, data->data,
864                                          data->length, True, 0);
865 }
866
867 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
868                             struct server_id server, uint32_t msg_type,
869                             const uint8 *buf, size_t len)
870 {
871         DATA_BLOB blob = data_blob_const(buf, len);
872         return messaging_send(msg_ctx, server, msg_type, &blob);
873 }
874
875 NTSTATUS messaging_send_buf_with_timeout(struct messaging_context *msg_ctx,
876                                          struct server_id server,
877                                          uint32_t msg_type,
878                                          const uint8 *buf, size_t len,
879                                          int timeout)
880 {
881         return message_send_pid_internal(server, msg_type, buf, len,
882                                          True, timeout);
883 }
884
885 /** @} **/