s4-messaging ensure we do not segfault on a NULL msg context in cleanup
[kai/samba-autobuild/.git] / source4 / lib / messaging / messaging.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    Samba internal messaging functions
5
6    Copyright (C) Andrew Tridgell 2004
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 3 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 */
21
22 #include "includes.h"
23 #include "lib/events/events.h"
24 #include "system/filesys.h"
25 #include "messaging/messaging.h"
26 #include "../lib/util/dlinklist.h"
27 #include "lib/socket/socket.h"
28 #include "librpc/gen_ndr/ndr_irpc.h"
29 #include "lib/messaging/irpc.h"
30 #include "lib/util/tdb_wrap.h"
31 #include "../lib/util/unix_privs.h"
32 #include "librpc/rpc/dcerpc.h"
33 #include "../lib/tdb_compat/tdb_compat.h"
34 #include "../lib/util/util_tdb.h"
35 #include "cluster/cluster.h"
36 #include "../lib/util/tevent_ntstatus.h"
37
38 /* change the message version with any incompatible changes in the protocol */
39 #define IMESSAGING_VERSION 1
40
41 /*
42   a pending irpc call
43 */
44 struct irpc_request {
45         struct imessaging_context *msg_ctx;
46         int callid;
47         struct {
48                 void (*handler)(struct irpc_request *irpc, struct irpc_message *m);
49                 void *private_data;
50         } incoming;
51 };
52
53 struct imessaging_context {
54         struct server_id server_id;
55         struct socket_context *sock;
56         const char *base_path;
57         const char *path;
58         struct dispatch_fn **dispatch;
59         uint32_t num_types;
60         struct idr_context *dispatch_tree;
61         struct imessaging_rec *pending;
62         struct imessaging_rec *retry_queue;
63         struct irpc_list *irpc;
64         struct idr_context *idr;
65         const char **names;
66         struct timeval start_time;
67         struct tevent_timer *retry_te;
68         struct {
69                 struct tevent_context *ev;
70                 struct tevent_fd *fde;
71         } event;
72 };
73
74 /* we have a linked list of dispatch handlers for each msg_type that
75    this messaging server can deal with */
76 struct dispatch_fn {
77         struct dispatch_fn *next, *prev;
78         uint32_t msg_type;
79         void *private_data;
80         msg_callback_t fn;
81 };
82
83 /* an individual message */
84 struct imessaging_rec {
85         struct imessaging_rec *next, *prev;
86         struct imessaging_context *msg;
87         const char *path;
88
89         struct imessaging_header {
90                 uint32_t version;
91                 uint32_t msg_type;
92                 struct server_id from;
93                 struct server_id to;
94                 uint32_t length;
95         } *header;
96
97         DATA_BLOB packet;
98         uint32_t retries;
99 };
100
101
102 static void irpc_handler(struct imessaging_context *, void *,
103                          uint32_t, struct server_id, DATA_BLOB *);
104
105
106 /*
107  A useful function for testing the message system.
108 */
109 static void ping_message(struct imessaging_context *msg, void *private_data,
110                          uint32_t msg_type, struct server_id src, DATA_BLOB *data)
111 {
112         char *task_id = server_id_str(NULL, &src);
113         DEBUG(1,("INFO: Received PING message from server %s [%.*s]\n",
114                  task_id, (int)data->length,
115                  data->data?(const char *)data->data:""));
116         talloc_free(task_id);
117         imessaging_send(msg, src, MSG_PONG, data);
118 }
119
120 /*
121   return uptime of messaging server via irpc
122 */
123 static NTSTATUS irpc_uptime(struct irpc_message *msg, 
124                             struct irpc_uptime *r)
125 {
126         struct imessaging_context *ctx = talloc_get_type(msg->private_data, struct imessaging_context);
127         *r->out.start_time = timeval_to_nttime(&ctx->start_time);
128         return NT_STATUS_OK;
129 }
130
131 /* 
132    return the path to a messaging socket
133 */
134 static char *imessaging_path(struct imessaging_context *msg, struct server_id server_id)
135 {
136         TALLOC_CTX *tmp_ctx = talloc_new(msg);
137         const char *id = server_id_str(tmp_ctx, &server_id);
138         char *s;
139         if (id == NULL) {
140                 return NULL;
141         }
142         s = talloc_asprintf(msg, "%s/msg.%s", msg->base_path, id);
143         talloc_steal(s, tmp_ctx);
144         return s;
145 }
146
147 /*
148   dispatch a fully received message
149
150   note that this deliberately can match more than one message handler
151   per message. That allows a single messasging context to register
152   (for example) a debug handler for more than one piece of code
153 */
154 static void imessaging_dispatch(struct imessaging_context *msg, struct imessaging_rec *rec)
155 {
156         struct dispatch_fn *d, *next;
157
158         /* temporary IDs use an idtree, the rest use a array of pointers */
159         if (rec->header->msg_type >= MSG_TMP_BASE) {
160                 d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 
161                                                    rec->header->msg_type);
162         } else if (rec->header->msg_type < msg->num_types) {
163                 d = msg->dispatch[rec->header->msg_type];
164         } else {
165                 d = NULL;
166         }
167
168         for (; d; d = next) {
169                 DATA_BLOB data;
170                 next = d->next;
171                 data.data = rec->packet.data + sizeof(*rec->header);
172                 data.length = rec->header->length;
173                 d->fn(msg, d->private_data, d->msg_type, rec->header->from, &data);
174         }
175         rec->header->length = 0;
176 }
177
178 /*
179   handler for messages that arrive from other nodes in the cluster
180 */
181 static void cluster_message_handler(struct imessaging_context *msg, DATA_BLOB packet)
182 {
183         struct imessaging_rec *rec;
184
185         rec = talloc(msg, struct imessaging_rec);
186         if (rec == NULL) {
187                 smb_panic("Unable to allocate imessaging_rec");
188         }
189
190         rec->msg           = msg;
191         rec->path          = msg->path;
192         rec->header        = (struct imessaging_header *)packet.data;
193         rec->packet        = packet;
194         rec->retries       = 0;
195
196         if (packet.length != sizeof(*rec->header) + rec->header->length) {
197                 DEBUG(0,("messaging: bad message header size %d should be %d\n", 
198                          rec->header->length, (int)(packet.length - sizeof(*rec->header))));
199                 talloc_free(rec);
200                 return;
201         }
202
203         imessaging_dispatch(msg, rec);
204         talloc_free(rec);
205 }
206
207
208
209 /*
210   try to send the message
211 */
212 static NTSTATUS try_send(struct imessaging_rec *rec)
213 {
214         struct imessaging_context *msg = rec->msg;
215         size_t nsent;
216         void *priv;
217         NTSTATUS status;
218         struct socket_address *path;
219
220         /* rec->path is the path of the *other* socket, where we want
221          * this to end up */
222         path = socket_address_from_strings(msg, msg->sock->backend_name, 
223                                            rec->path, 0);
224         if (!path) {
225                 return NT_STATUS_NO_MEMORY;
226         }
227
228         /* we send with privileges so messages work from any context */
229         priv = root_privileges();
230         status = socket_sendto(msg->sock, &rec->packet, &nsent, path);
231         talloc_free(path);
232         talloc_free(priv);
233
234         return status;
235 }
236
237 /*
238   retry backed off messages
239 */
240 static void msg_retry_timer(struct tevent_context *ev, struct tevent_timer *te, 
241                             struct timeval t, void *private_data)
242 {
243         struct imessaging_context *msg = talloc_get_type(private_data,
244                                                         struct imessaging_context);
245         msg->retry_te = NULL;
246
247         /* put the messages back on the main queue */
248         while (msg->retry_queue) {
249                 struct imessaging_rec *rec = msg->retry_queue;
250                 DLIST_REMOVE(msg->retry_queue, rec);
251                 DLIST_ADD_END(msg->pending, rec, struct imessaging_rec *);
252         }
253
254         TEVENT_FD_WRITEABLE(msg->event.fde);
255 }
256
257 /*
258   handle a socket write event
259 */
260 static void imessaging_send_handler(struct imessaging_context *msg)
261 {
262         while (msg->pending) {
263                 struct imessaging_rec *rec = msg->pending;
264                 NTSTATUS status;
265                 status = try_send(rec);
266                 if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
267                         rec->retries++;
268                         if (rec->retries > 3) {
269                                 /* we're getting continuous write errors -
270                                    backoff this record */
271                                 DLIST_REMOVE(msg->pending, rec);
272                                 DLIST_ADD_END(msg->retry_queue, rec, 
273                                               struct imessaging_rec *);
274                                 if (msg->retry_te == NULL) {
275                                         msg->retry_te = 
276                                                 tevent_add_timer(msg->event.ev, msg,
277                                                                 timeval_current_ofs(1, 0), 
278                                                                 msg_retry_timer, msg);
279                                 }
280                         }
281                         break;
282                 }
283                 rec->retries = 0;
284                 if (!NT_STATUS_IS_OK(status)) {
285                         TALLOC_CTX *tmp_ctx = talloc_new(msg);
286                         DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n", 
287                                  server_id_str(tmp_ctx, &rec->header->from),
288                                  server_id_str(tmp_ctx, &rec->header->to),
289                                  rec->header->msg_type, 
290                                  nt_errstr(status)));
291                         talloc_free(tmp_ctx);
292                 }
293                 DLIST_REMOVE(msg->pending, rec);
294                 talloc_free(rec);
295         }
296         if (msg->pending == NULL) {
297                 TEVENT_FD_NOT_WRITEABLE(msg->event.fde);
298         }
299 }
300
301 /*
302   handle a new incoming packet
303 */
304 static void imessaging_recv_handler(struct imessaging_context *msg)
305 {
306         struct imessaging_rec *rec;
307         NTSTATUS status;
308         DATA_BLOB packet;
309         size_t msize;
310
311         /* see how many bytes are in the next packet */
312         status = socket_pending(msg->sock, &msize);
313         if (!NT_STATUS_IS_OK(status)) {
314                 DEBUG(0,("socket_pending failed in messaging - %s\n", 
315                          nt_errstr(status)));
316                 return;
317         }
318         
319         packet = data_blob_talloc(msg, NULL, msize);
320         if (packet.data == NULL) {
321                 /* assume this is temporary and retry */
322                 return;
323         }
324             
325         status = socket_recv(msg->sock, packet.data, msize, &msize);
326         if (!NT_STATUS_IS_OK(status)) {
327                 data_blob_free(&packet);
328                 return;
329         }
330
331         if (msize < sizeof(*rec->header)) {
332                 DEBUG(0,("messaging: bad message of size %d\n", (int)msize));
333                 data_blob_free(&packet);
334                 return;
335         }
336
337         rec = talloc(msg, struct imessaging_rec);
338         if (rec == NULL) {
339                 smb_panic("Unable to allocate imessaging_rec");
340         }
341
342         talloc_steal(rec, packet.data);
343         rec->msg           = msg;
344         rec->path          = msg->path;
345         rec->header        = (struct imessaging_header *)packet.data;
346         rec->packet        = packet;
347         rec->retries       = 0;
348
349         if (msize != sizeof(*rec->header) + rec->header->length) {
350                 DEBUG(0,("messaging: bad message header size %d should be %d\n", 
351                          rec->header->length, (int)(msize - sizeof(*rec->header))));
352                 talloc_free(rec);
353                 return;
354         }
355
356         imessaging_dispatch(msg, rec);
357         talloc_free(rec);
358 }
359
360
361 /*
362   handle a socket event
363 */
364 static void imessaging_handler(struct tevent_context *ev, struct tevent_fd *fde,
365                               uint16_t flags, void *private_data)
366 {
367         struct imessaging_context *msg = talloc_get_type(private_data,
368                                                         struct imessaging_context);
369         if (flags & TEVENT_FD_WRITE) {
370                 imessaging_send_handler(msg);
371         }
372         if (flags & TEVENT_FD_READ) {
373                 imessaging_recv_handler(msg);
374         }
375 }
376
377
378 /*
379   Register a dispatch function for a particular message type.
380 */
381 NTSTATUS imessaging_register(struct imessaging_context *msg, void *private_data,
382                             uint32_t msg_type, msg_callback_t fn)
383 {
384         struct dispatch_fn *d;
385
386         /* possibly expand dispatch array */
387         if (msg_type >= msg->num_types) {
388                 struct dispatch_fn **dp;
389                 int i;
390                 dp = talloc_realloc(msg, msg->dispatch, struct dispatch_fn *, msg_type+1);
391                 NT_STATUS_HAVE_NO_MEMORY(dp);
392                 msg->dispatch = dp;
393                 for (i=msg->num_types;i<=msg_type;i++) {
394                         msg->dispatch[i] = NULL;
395                 }
396                 msg->num_types = msg_type+1;
397         }
398
399         d = talloc_zero(msg->dispatch, struct dispatch_fn);
400         NT_STATUS_HAVE_NO_MEMORY(d);
401         d->msg_type = msg_type;
402         d->private_data = private_data;
403         d->fn = fn;
404
405         DLIST_ADD(msg->dispatch[msg_type], d);
406
407         return NT_STATUS_OK;
408 }
409
410 /*
411   register a temporary message handler. The msg_type is allocated
412   above MSG_TMP_BASE
413 */
414 NTSTATUS imessaging_register_tmp(struct imessaging_context *msg, void *private_data,
415                                 msg_callback_t fn, uint32_t *msg_type)
416 {
417         struct dispatch_fn *d;
418         int id;
419
420         d = talloc_zero(msg->dispatch, struct dispatch_fn);
421         NT_STATUS_HAVE_NO_MEMORY(d);
422         d->private_data = private_data;
423         d->fn = fn;
424
425         id = idr_get_new_above(msg->dispatch_tree, d, MSG_TMP_BASE, UINT16_MAX);
426         if (id == -1) {
427                 talloc_free(d);
428                 return NT_STATUS_TOO_MANY_CONTEXT_IDS;
429         }
430
431         d->msg_type = (uint32_t)id;
432         (*msg_type) = d->msg_type;
433
434         return NT_STATUS_OK;
435 }
436
437 /*
438   De-register the function for a particular message type.
439 */
440 void imessaging_deregister(struct imessaging_context *msg, uint32_t msg_type, void *private_data)
441 {
442         struct dispatch_fn *d, *next;
443
444         if (msg_type >= msg->num_types) {
445                 d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 
446                                                    msg_type);
447                 if (!d) return;
448                 idr_remove(msg->dispatch_tree, msg_type);
449                 talloc_free(d);
450                 return;
451         }
452
453         for (d = msg->dispatch[msg_type]; d; d = next) {
454                 next = d->next;
455                 if (d->private_data == private_data) {
456                         DLIST_REMOVE(msg->dispatch[msg_type], d);
457                         talloc_free(d);
458                 }
459         }
460 }
461
462 /*
463   Send a message to a particular server
464 */
465 NTSTATUS imessaging_send(struct imessaging_context *msg, struct server_id server,
466                         uint32_t msg_type, const DATA_BLOB *data)
467 {
468         struct imessaging_rec *rec;
469         NTSTATUS status;
470         size_t dlength = data?data->length:0;
471
472         rec = talloc(msg, struct imessaging_rec);
473         if (rec == NULL) {
474                 return NT_STATUS_NO_MEMORY;
475         }
476
477         rec->packet = data_blob_talloc(rec, NULL, sizeof(*rec->header) + dlength);
478         if (rec->packet.data == NULL) {
479                 talloc_free(rec);
480                 return NT_STATUS_NO_MEMORY;
481         }
482
483         rec->retries       = 0;
484         rec->msg              = msg;
485         rec->header           = (struct imessaging_header *)rec->packet.data;
486         /* zero padding */
487         ZERO_STRUCTP(rec->header);
488         rec->header->version  = IMESSAGING_VERSION;
489         rec->header->msg_type = msg_type;
490         rec->header->from     = msg->server_id;
491         rec->header->to       = server;
492         rec->header->length   = dlength;
493         if (dlength != 0) {
494                 memcpy(rec->packet.data + sizeof(*rec->header), 
495                        data->data, dlength);
496         }
497
498         if (!cluster_node_equal(&msg->server_id, &server)) {
499                 /* the destination is on another node - dispatch via
500                    the cluster layer */
501                 status = cluster_message_send(server, &rec->packet);
502                 talloc_free(rec);
503                 return status;
504         }
505
506         rec->path = imessaging_path(msg, server);
507         talloc_steal(rec, rec->path);
508
509         if (msg->pending != NULL) {
510                 status = STATUS_MORE_ENTRIES;
511         } else {
512                 status = try_send(rec);
513         }
514
515         if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
516                 if (msg->pending == NULL) {
517                         TEVENT_FD_WRITEABLE(msg->event.fde);
518                 }
519                 DLIST_ADD_END(msg->pending, rec, struct imessaging_rec *);
520                 return NT_STATUS_OK;
521         }
522
523         talloc_free(rec);
524
525         return status;
526 }
527
528 /*
529   Send a message to a particular server, with the message containing a single pointer
530 */
531 NTSTATUS imessaging_send_ptr(struct imessaging_context *msg, struct server_id server,
532                             uint32_t msg_type, void *ptr)
533 {
534         DATA_BLOB blob;
535
536         blob.data = (uint8_t *)&ptr;
537         blob.length = sizeof(void *);
538
539         return imessaging_send(msg, server, msg_type, &blob);
540 }
541
542
543 /*
544   remove our messaging socket and database entry
545 */
546 int imessaging_cleanup(struct imessaging_context *msg)
547 {
548         if (!msg) {
549                 return 0;
550         }
551
552         DEBUG(5,("imessaging: cleaning up %s\n", msg->path));
553         unlink(msg->path);
554         while (msg->names && msg->names[0]) {
555                 irpc_remove_name(msg, msg->names[0]);
556         }
557         return 0;
558 }
559
560 /*
561   create the listening socket and setup the dispatcher
562
563   use temporary=true when you want a destructor to remove the
564   associated messaging socket and database entry on talloc free. Don't
565   use this in processes that may fork and a child may talloc free this
566   memory
567 */
568 struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
569                                            const char *dir,
570                                            struct server_id server_id,
571                                            struct tevent_context *ev,
572                                            bool auto_remove)
573 {
574         struct imessaging_context *msg;
575         NTSTATUS status;
576         struct socket_address *path;
577
578         if (ev == NULL) {
579                 return NULL;
580         }
581
582         msg = talloc_zero(mem_ctx, struct imessaging_context);
583         if (msg == NULL) {
584                 return NULL;
585         }
586
587         /* setup a handler for messages from other cluster nodes, if appropriate */
588         status = cluster_message_init(msg, server_id, cluster_message_handler);
589         if (!NT_STATUS_IS_OK(status)) {
590                 talloc_free(msg);
591                 return NULL;
592         }
593
594         /* create the messaging directory if needed */
595         mkdir(dir, 0700);
596
597         msg->base_path     = talloc_reference(msg, dir);
598         msg->path          = imessaging_path(msg, server_id);
599         msg->server_id     = server_id;
600         msg->idr           = idr_init(msg);
601         msg->dispatch_tree = idr_init(msg);
602         msg->start_time    = timeval_current();
603
604         status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0);
605         if (!NT_STATUS_IS_OK(status)) {
606                 talloc_free(msg);
607                 return NULL;
608         }
609
610         /* by stealing here we ensure that the socket is cleaned up (and even 
611            deleted) on exit */
612         talloc_steal(msg, msg->sock);
613
614         path = socket_address_from_strings(msg, msg->sock->backend_name, 
615                                            msg->path, 0);
616         if (!path) {
617                 talloc_free(msg);
618                 return NULL;
619         }
620
621         status = socket_listen(msg->sock, path, 50, 0);
622         if (!NT_STATUS_IS_OK(status)) {
623                 DEBUG(0,("Unable to setup messaging listener for '%s':%s\n", msg->path, nt_errstr(status)));
624                 talloc_free(msg);
625                 return NULL;
626         }
627
628         /* it needs to be non blocking for sends */
629         set_blocking(socket_get_fd(msg->sock), false);
630
631         msg->event.ev   = ev;
632         msg->event.fde  = tevent_add_fd(ev, msg, socket_get_fd(msg->sock),
633                                         TEVENT_FD_READ, imessaging_handler, msg);
634         tevent_fd_set_auto_close(msg->event.fde);
635
636         if (auto_remove) {
637                 talloc_set_destructor(msg, imessaging_cleanup);
638         }
639         
640         imessaging_register(msg, NULL, MSG_PING, ping_message);
641         imessaging_register(msg, NULL, MSG_IRPC, irpc_handler);
642         IRPC_REGISTER(msg, irpc, IRPC_UPTIME, irpc_uptime, msg);
643
644         return msg;
645 }
646
647 /* 
648    A hack, for the short term until we get 'client only' messaging in place 
649 */
650 struct imessaging_context *imessaging_client_init(TALLOC_CTX *mem_ctx,
651                                                 const char *dir,
652                                                 struct tevent_context *ev)
653 {
654         struct server_id id;
655         ZERO_STRUCT(id);
656         id.pid = random() % 0x10000000;
657         return imessaging_init(mem_ctx, dir, id, ev, true);
658 }
659 /*
660   a list of registered irpc server functions
661 */
662 struct irpc_list {
663         struct irpc_list *next, *prev;
664         struct GUID uuid;
665         const struct ndr_interface_table *table;
666         int callnum;
667         irpc_function_t fn;
668         void *private_data;
669 };
670
671
672 /*
673   register a irpc server function
674 */
675 NTSTATUS irpc_register(struct imessaging_context *msg_ctx,
676                        const struct ndr_interface_table *table, 
677                        int callnum, irpc_function_t fn, void *private_data)
678 {
679         struct irpc_list *irpc;
680
681         /* override an existing handler, if any */
682         for (irpc=msg_ctx->irpc; irpc; irpc=irpc->next) {
683                 if (irpc->table == table && irpc->callnum == callnum) {
684                         break;
685                 }
686         }
687         if (irpc == NULL) {
688                 irpc = talloc(msg_ctx, struct irpc_list);
689                 NT_STATUS_HAVE_NO_MEMORY(irpc);
690                 DLIST_ADD(msg_ctx->irpc, irpc);
691         }
692
693         irpc->table   = table;
694         irpc->callnum = callnum;
695         irpc->fn      = fn;
696         irpc->private_data = private_data;
697         irpc->uuid = irpc->table->syntax_id.uuid;
698
699         return NT_STATUS_OK;
700 }
701
702
703 /*
704   handle an incoming irpc reply message
705 */
706 static void irpc_handler_reply(struct imessaging_context *msg_ctx, struct irpc_message *m)
707 {
708         struct irpc_request *irpc;
709
710         irpc = (struct irpc_request *)idr_find(msg_ctx->idr, m->header.callid);
711         if (irpc == NULL) return;
712
713         irpc->incoming.handler(irpc, m);
714 }
715
716 /*
717   send a irpc reply
718 */
719 NTSTATUS irpc_send_reply(struct irpc_message *m, NTSTATUS status)
720 {
721         struct ndr_push *push;
722         DATA_BLOB packet;
723         enum ndr_err_code ndr_err;
724
725         m->header.status = status;
726
727         /* setup the reply */
728         push = ndr_push_init_ctx(m->ndr);
729         if (push == NULL) {
730                 status = NT_STATUS_NO_MEMORY;
731                 goto failed;
732         }
733
734         m->header.flags |= IRPC_FLAG_REPLY;
735         m->header.creds.token= NULL;
736
737         /* construct the packet */
738         ndr_err = ndr_push_irpc_header(push, NDR_SCALARS|NDR_BUFFERS, &m->header);
739         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
740                 status = ndr_map_error2ntstatus(ndr_err);
741                 goto failed;
742         }
743
744         ndr_err = m->irpc->table->calls[m->irpc->callnum].ndr_push(push, NDR_OUT, m->data);
745         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
746                 status = ndr_map_error2ntstatus(ndr_err);
747                 goto failed;
748         }
749
750         /* send the reply message */
751         packet = ndr_push_blob(push);
752         status = imessaging_send(m->msg_ctx, m->from, MSG_IRPC, &packet);
753         if (!NT_STATUS_IS_OK(status)) goto failed;
754
755 failed:
756         talloc_free(m);
757         return status;
758 }
759
760 /*
761   handle an incoming irpc request message
762 */
763 static void irpc_handler_request(struct imessaging_context *msg_ctx,
764                                  struct irpc_message *m)
765 {
766         struct irpc_list *i;
767         void *r;
768         enum ndr_err_code ndr_err;
769
770         for (i=msg_ctx->irpc; i; i=i->next) {
771                 if (GUID_equal(&i->uuid, &m->header.uuid) &&
772                     i->table->syntax_id.if_version == m->header.if_version &&
773                     i->callnum == m->header.callnum) {
774                         break;
775                 }
776         }
777
778         if (i == NULL) {
779                 /* no registered handler for this message */
780                 talloc_free(m);
781                 return;
782         }
783
784         /* allocate space for the structure */
785         r = talloc_zero_size(m->ndr, i->table->calls[m->header.callnum].struct_size);
786         if (r == NULL) goto failed;
787
788         m->ndr->flags |= LIBNDR_FLAG_REF_ALLOC;
789
790         /* parse the request data */
791         ndr_err = i->table->calls[i->callnum].ndr_pull(m->ndr, NDR_IN, r);
792         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
793
794         /* make the call */
795         m->private_data= i->private_data;
796         m->defer_reply = false;
797         m->no_reply    = false;
798         m->msg_ctx     = msg_ctx;
799         m->irpc        = i;
800         m->data        = r;
801         m->ev          = msg_ctx->event.ev;
802
803         m->header.status = i->fn(m, r);
804
805         if (m->no_reply) {
806                 /* the server function won't ever be replying to this request */
807                 talloc_free(m);
808                 return;
809         }
810
811         if (m->defer_reply) {
812                 /* the server function has asked to defer the reply to later */
813                 talloc_steal(msg_ctx, m);
814                 return;
815         }
816
817         irpc_send_reply(m, m->header.status);
818         return;
819
820 failed:
821         talloc_free(m);
822 }
823
824 /*
825   handle an incoming irpc message
826 */
827 static void irpc_handler(struct imessaging_context *msg_ctx, void *private_data,
828                          uint32_t msg_type, struct server_id src, DATA_BLOB *packet)
829 {
830         struct irpc_message *m;
831         enum ndr_err_code ndr_err;
832
833         m = talloc(msg_ctx, struct irpc_message);
834         if (m == NULL) goto failed;
835
836         m->from = src;
837
838         m->ndr = ndr_pull_init_blob(packet, m);
839         if (m->ndr == NULL) goto failed;
840
841         m->ndr->flags |= LIBNDR_FLAG_REF_ALLOC;
842
843         ndr_err = ndr_pull_irpc_header(m->ndr, NDR_BUFFERS|NDR_SCALARS, &m->header);
844         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
845
846         if (m->header.flags & IRPC_FLAG_REPLY) {
847                 irpc_handler_reply(msg_ctx, m);
848         } else {
849                 irpc_handler_request(msg_ctx, m);
850         }
851         return;
852
853 failed:
854         talloc_free(m);
855 }
856
857
858 /*
859   destroy a irpc request
860 */
861 static int irpc_destructor(struct irpc_request *irpc)
862 {
863         if (irpc->callid != -1) {
864                 idr_remove(irpc->msg_ctx->idr, irpc->callid);
865                 irpc->callid = -1;
866         }
867
868         return 0;
869 }
870
871 /*
872   open the naming database
873 */
874 static struct tdb_wrap *irpc_namedb_open(struct imessaging_context *msg_ctx)
875 {
876         struct tdb_wrap *t;
877         char *path = talloc_asprintf(msg_ctx, "%s/names.tdb", msg_ctx->base_path);
878         if (path == NULL) {
879                 return NULL;
880         }
881         t = tdb_wrap_open(msg_ctx, path, 0, 0, O_RDWR|O_CREAT, 0660);
882         talloc_free(path);
883         return t;
884 }
885         
886
887 /*
888   add a string name that this irpc server can be called on
889 */
890 NTSTATUS irpc_add_name(struct imessaging_context *msg_ctx, const char *name)
891 {
892         struct tdb_wrap *t;
893         TDB_DATA rec;
894         int count;
895         NTSTATUS status = NT_STATUS_OK;
896
897         t = irpc_namedb_open(msg_ctx);
898         NT_STATUS_HAVE_NO_MEMORY(t);
899
900         if (tdb_lock_bystring(t->tdb, name) != 0) {
901                 talloc_free(t);
902                 return NT_STATUS_LOCK_NOT_GRANTED;
903         }
904         rec = tdb_fetch_bystring(t->tdb, name);
905         count = rec.dsize / sizeof(struct server_id);
906         rec.dptr = (unsigned char *)realloc_p(rec.dptr, struct server_id, count+1);
907         rec.dsize += sizeof(struct server_id);
908         if (rec.dptr == NULL) {
909                 tdb_unlock_bystring(t->tdb, name);
910                 talloc_free(t);
911                 return NT_STATUS_NO_MEMORY;
912         }
913         ((struct server_id *)rec.dptr)[count] = msg_ctx->server_id;
914         if (tdb_store_bystring(t->tdb, name, rec, 0) != 0) {
915                 status = NT_STATUS_INTERNAL_ERROR;
916         }
917         free(rec.dptr);
918         tdb_unlock_bystring(t->tdb, name);
919         talloc_free(t);
920
921         msg_ctx->names = str_list_add(msg_ctx->names, name);
922         talloc_steal(msg_ctx, msg_ctx->names);
923
924         return status;
925 }
926
927 /*
928   return a list of server ids for a server name
929 */
930 struct server_id *irpc_servers_byname(struct imessaging_context *msg_ctx,
931                                       TALLOC_CTX *mem_ctx,
932                                       const char *name)
933 {
934         struct tdb_wrap *t;
935         TDB_DATA rec;
936         int count, i;
937         struct server_id *ret;
938
939         t = irpc_namedb_open(msg_ctx);
940         if (t == NULL) {
941                 return NULL;
942         }
943
944         if (tdb_lock_bystring(t->tdb, name) != 0) {
945                 talloc_free(t);
946                 return NULL;
947         }
948         rec = tdb_fetch_bystring(t->tdb, name);
949         if (rec.dptr == NULL) {
950                 tdb_unlock_bystring(t->tdb, name);
951                 talloc_free(t);
952                 return NULL;
953         }
954         count = rec.dsize / sizeof(struct server_id);
955         ret = talloc_array(mem_ctx, struct server_id, count+1);
956         if (ret == NULL) {
957                 tdb_unlock_bystring(t->tdb, name);
958                 talloc_free(t);
959                 return NULL;
960         }
961         for (i=0;i<count;i++) {
962                 ret[i] = ((struct server_id *)rec.dptr)[i];
963         }
964         ret[i] = cluster_id(0, 0);
965         free(rec.dptr);
966         tdb_unlock_bystring(t->tdb, name);
967         talloc_free(t);
968
969         return ret;
970 }
971
972 /*
973   remove a name from a messaging context
974 */
975 void irpc_remove_name(struct imessaging_context *msg_ctx, const char *name)
976 {
977         struct tdb_wrap *t;
978         TDB_DATA rec;
979         int count, i;
980         struct server_id *ids;
981
982         str_list_remove(msg_ctx->names, name);
983
984         t = irpc_namedb_open(msg_ctx);
985         if (t == NULL) {
986                 return;
987         }
988
989         if (tdb_lock_bystring(t->tdb, name) != 0) {
990                 talloc_free(t);
991                 return;
992         }
993         rec = tdb_fetch_bystring(t->tdb, name);
994         if (rec.dptr == NULL) {
995                 tdb_unlock_bystring(t->tdb, name);
996                 talloc_free(t);
997                 return;
998         }
999         count = rec.dsize / sizeof(struct server_id);
1000         if (count == 0) {
1001                 free(rec.dptr);
1002                 tdb_unlock_bystring(t->tdb, name);
1003                 talloc_free(t);
1004                 return;
1005         }
1006         ids = (struct server_id *)rec.dptr;
1007         for (i=0;i<count;i++) {
1008                 if (cluster_id_equal(&ids[i], &msg_ctx->server_id)) {
1009                         if (i < count-1) {
1010                                 memmove(ids+i, ids+i+1, 
1011                                         sizeof(struct server_id) * (count-(i+1)));
1012                         }
1013                         rec.dsize -= sizeof(struct server_id);
1014                         break;
1015                 }
1016         }
1017         tdb_store_bystring(t->tdb, name, rec, 0);
1018         free(rec.dptr);
1019         tdb_unlock_bystring(t->tdb, name);
1020         talloc_free(t);
1021 }
1022
1023 struct server_id imessaging_get_server_id(struct imessaging_context *msg_ctx)
1024 {
1025         return msg_ctx->server_id;
1026 }
1027
1028 struct irpc_bh_state {
1029         struct imessaging_context *msg_ctx;
1030         struct server_id server_id;
1031         const struct ndr_interface_table *table;
1032         uint32_t timeout;
1033         struct security_token *token;
1034 };
1035
1036 static bool irpc_bh_is_connected(struct dcerpc_binding_handle *h)
1037 {
1038         struct irpc_bh_state *hs = dcerpc_binding_handle_data(h,
1039                                    struct irpc_bh_state);
1040
1041         if (!hs->msg_ctx) {
1042                 return false;
1043         }
1044
1045         return true;
1046 }
1047
1048 static uint32_t irpc_bh_set_timeout(struct dcerpc_binding_handle *h,
1049                                     uint32_t timeout)
1050 {
1051         struct irpc_bh_state *hs = dcerpc_binding_handle_data(h,
1052                                    struct irpc_bh_state);
1053         uint32_t old = hs->timeout;
1054
1055         hs->timeout = timeout;
1056
1057         return old;
1058 }
1059
1060 struct irpc_bh_raw_call_state {
1061         struct irpc_request *irpc;
1062         uint32_t opnum;
1063         DATA_BLOB in_data;
1064         DATA_BLOB in_packet;
1065         DATA_BLOB out_data;
1066 };
1067
1068 static void irpc_bh_raw_call_incoming_handler(struct irpc_request *irpc,
1069                                               struct irpc_message *m);
1070
1071 static struct tevent_req *irpc_bh_raw_call_send(TALLOC_CTX *mem_ctx,
1072                                                 struct tevent_context *ev,
1073                                                 struct dcerpc_binding_handle *h,
1074                                                 const struct GUID *object,
1075                                                 uint32_t opnum,
1076                                                 uint32_t in_flags,
1077                                                 const uint8_t *in_data,
1078                                                 size_t in_length)
1079 {
1080         struct irpc_bh_state *hs =
1081                 dcerpc_binding_handle_data(h,
1082                 struct irpc_bh_state);
1083         struct tevent_req *req;
1084         struct irpc_bh_raw_call_state *state;
1085         bool ok;
1086         struct irpc_header header;
1087         struct ndr_push *ndr;
1088         NTSTATUS status;
1089         enum ndr_err_code ndr_err;
1090
1091         req = tevent_req_create(mem_ctx, &state,
1092                                 struct irpc_bh_raw_call_state);
1093         if (req == NULL) {
1094                 return NULL;
1095         }
1096         state->opnum = opnum;
1097         state->in_data.data = discard_const_p(uint8_t, in_data);
1098         state->in_data.length = in_length;
1099
1100         ok = irpc_bh_is_connected(h);
1101         if (!ok) {
1102                 tevent_req_nterror(req, NT_STATUS_INVALID_CONNECTION);
1103                 return tevent_req_post(req, ev);
1104         }
1105
1106         state->irpc = talloc_zero(state, struct irpc_request);
1107         if (tevent_req_nomem(state->irpc, req)) {
1108                 return tevent_req_post(req, ev);
1109         }
1110
1111         state->irpc->msg_ctx  = hs->msg_ctx;
1112         state->irpc->callid   = idr_get_new(hs->msg_ctx->idr,
1113                                             state->irpc, UINT16_MAX);
1114         if (state->irpc->callid == -1) {
1115                 tevent_req_nterror(req, NT_STATUS_INSUFFICIENT_RESOURCES);
1116                 return tevent_req_post(req, ev);
1117         }
1118         state->irpc->incoming.handler = irpc_bh_raw_call_incoming_handler;
1119         state->irpc->incoming.private_data = req;
1120
1121         talloc_set_destructor(state->irpc, irpc_destructor);
1122
1123         /* setup the header */
1124         header.uuid = hs->table->syntax_id.uuid;
1125
1126         header.if_version = hs->table->syntax_id.if_version;
1127         header.callid     = state->irpc->callid;
1128         header.callnum    = state->opnum;
1129         header.flags      = 0;
1130         header.status     = NT_STATUS_OK;
1131         header.creds.token= hs->token;
1132
1133         /* construct the irpc packet */
1134         ndr = ndr_push_init_ctx(state->irpc);
1135         if (tevent_req_nomem(ndr, req)) {
1136                 return tevent_req_post(req, ev);
1137         }
1138
1139         ndr_err = ndr_push_irpc_header(ndr, NDR_SCALARS|NDR_BUFFERS, &header);
1140         status = ndr_map_error2ntstatus(ndr_err);
1141         if (!NT_STATUS_IS_OK(status)) {
1142                 tevent_req_nterror(req, status);
1143                 return tevent_req_post(req, ev);
1144         }
1145
1146         ndr_err = ndr_push_bytes(ndr, in_data, in_length);
1147         status = ndr_map_error2ntstatus(ndr_err);
1148         if (!NT_STATUS_IS_OK(status)) {
1149                 tevent_req_nterror(req, status);
1150                 return tevent_req_post(req, ev);
1151         }
1152
1153         /* and send it */
1154         state->in_packet = ndr_push_blob(ndr);
1155         status = imessaging_send(hs->msg_ctx, hs->server_id,
1156                                 MSG_IRPC, &state->in_packet);
1157         if (!NT_STATUS_IS_OK(status)) {
1158                 tevent_req_nterror(req, status);
1159                 return tevent_req_post(req, ev);
1160         }
1161
1162         if (hs->timeout != IRPC_CALL_TIMEOUT_INF) {
1163                 /* set timeout-callback in case caller wants that */
1164                 ok = tevent_req_set_endtime(req, ev, timeval_current_ofs(hs->timeout, 0));
1165                 if (!ok) {
1166                         return tevent_req_post(req, ev);
1167                 }
1168         }
1169
1170         return req;
1171 }
1172
1173 static void irpc_bh_raw_call_incoming_handler(struct irpc_request *irpc,
1174                                               struct irpc_message *m)
1175 {
1176         struct tevent_req *req =
1177                 talloc_get_type_abort(irpc->incoming.private_data,
1178                 struct tevent_req);
1179         struct irpc_bh_raw_call_state *state =
1180                 tevent_req_data(req,
1181                 struct irpc_bh_raw_call_state);
1182
1183         talloc_steal(state, m);
1184
1185         if (!NT_STATUS_IS_OK(m->header.status)) {
1186                 tevent_req_nterror(req, m->header.status);
1187                 return;
1188         }
1189
1190         state->out_data = data_blob_talloc(state,
1191                 m->ndr->data + m->ndr->offset,
1192                 m->ndr->data_size - m->ndr->offset);
1193         if ((m->ndr->data_size - m->ndr->offset) > 0 && !state->out_data.data) {
1194                 tevent_req_oom(req);
1195                 return;
1196         }
1197
1198         tevent_req_done(req);
1199 }
1200
1201 static NTSTATUS irpc_bh_raw_call_recv(struct tevent_req *req,
1202                                         TALLOC_CTX *mem_ctx,
1203                                         uint8_t **out_data,
1204                                         size_t *out_length,
1205                                         uint32_t *out_flags)
1206 {
1207         struct irpc_bh_raw_call_state *state =
1208                 tevent_req_data(req,
1209                 struct irpc_bh_raw_call_state);
1210         NTSTATUS status;
1211
1212         if (tevent_req_is_nterror(req, &status)) {
1213                 tevent_req_received(req);
1214                 return status;
1215         }
1216
1217         *out_data = talloc_move(mem_ctx, &state->out_data.data);
1218         *out_length = state->out_data.length;
1219         *out_flags = 0;
1220         tevent_req_received(req);
1221         return NT_STATUS_OK;
1222 }
1223
1224 struct irpc_bh_disconnect_state {
1225         uint8_t _dummy;
1226 };
1227
1228 static struct tevent_req *irpc_bh_disconnect_send(TALLOC_CTX *mem_ctx,
1229                                                 struct tevent_context *ev,
1230                                                 struct dcerpc_binding_handle *h)
1231 {
1232         struct irpc_bh_state *hs = dcerpc_binding_handle_data(h,
1233                                      struct irpc_bh_state);
1234         struct tevent_req *req;
1235         struct irpc_bh_disconnect_state *state;
1236         bool ok;
1237
1238         req = tevent_req_create(mem_ctx, &state,
1239                                 struct irpc_bh_disconnect_state);
1240         if (req == NULL) {
1241                 return NULL;
1242         }
1243
1244         ok = irpc_bh_is_connected(h);
1245         if (!ok) {
1246                 tevent_req_nterror(req, NT_STATUS_INVALID_CONNECTION);
1247                 return tevent_req_post(req, ev);
1248         }
1249
1250         hs->msg_ctx = NULL;
1251
1252         tevent_req_done(req);
1253         return tevent_req_post(req, ev);
1254 }
1255
1256 static NTSTATUS irpc_bh_disconnect_recv(struct tevent_req *req)
1257 {
1258         NTSTATUS status;
1259
1260         if (tevent_req_is_nterror(req, &status)) {
1261                 tevent_req_received(req);
1262                 return status;
1263         }
1264
1265         tevent_req_received(req);
1266         return NT_STATUS_OK;
1267 }
1268
1269 static bool irpc_bh_ref_alloc(struct dcerpc_binding_handle *h)
1270 {
1271         return true;
1272 }
1273
1274 static const struct dcerpc_binding_handle_ops irpc_bh_ops = {
1275         .name                   = "wbint",
1276         .is_connected           = irpc_bh_is_connected,
1277         .set_timeout            = irpc_bh_set_timeout,
1278         .raw_call_send          = irpc_bh_raw_call_send,
1279         .raw_call_recv          = irpc_bh_raw_call_recv,
1280         .disconnect_send        = irpc_bh_disconnect_send,
1281         .disconnect_recv        = irpc_bh_disconnect_recv,
1282
1283         .ref_alloc              = irpc_bh_ref_alloc,
1284 };
1285
1286 /* initialise a irpc binding handle */
1287 struct dcerpc_binding_handle *irpc_binding_handle(TALLOC_CTX *mem_ctx,
1288                                         struct imessaging_context *msg_ctx,
1289                                         struct server_id server_id,
1290                                         const struct ndr_interface_table *table)
1291 {
1292         struct dcerpc_binding_handle *h;
1293         struct irpc_bh_state *hs;
1294
1295         h = dcerpc_binding_handle_create(mem_ctx,
1296                                          &irpc_bh_ops,
1297                                          NULL,
1298                                          table,
1299                                          &hs,
1300                                          struct irpc_bh_state,
1301                                          __location__);
1302         if (h == NULL) {
1303                 return NULL;
1304         }
1305         hs->msg_ctx = msg_ctx;
1306         hs->server_id = server_id;
1307         hs->table = table;
1308         hs->timeout = IRPC_CALL_TIMEOUT;
1309
1310         dcerpc_binding_handle_set_sync_ev(h, msg_ctx->event.ev);
1311
1312         return h;
1313 }
1314
1315 struct dcerpc_binding_handle *irpc_binding_handle_by_name(TALLOC_CTX *mem_ctx,
1316                                         struct imessaging_context *msg_ctx,
1317                                         const char *dest_task,
1318                                         const struct ndr_interface_table *table)
1319 {
1320         struct dcerpc_binding_handle *h;
1321         struct server_id *sids;
1322         struct server_id sid;
1323
1324         /* find the server task */
1325         sids = irpc_servers_byname(msg_ctx, mem_ctx, dest_task);
1326         if (sids == NULL) {
1327                 errno = EADDRNOTAVAIL;
1328                 return NULL;
1329         }
1330         if (sids[0].pid == 0) {
1331                 talloc_free(sids);
1332                 errno = EADDRNOTAVAIL;
1333                 return NULL;
1334         }
1335         sid = sids[0];
1336         talloc_free(sids);
1337
1338         h = irpc_binding_handle(mem_ctx, msg_ctx,
1339                                 sid, table);
1340         if (h == NULL) {
1341                 return NULL;
1342         }
1343
1344         return h;
1345 }
1346
1347 void irpc_binding_handle_add_security_token(struct dcerpc_binding_handle *h,
1348                                             struct security_token *token)
1349 {
1350         struct irpc_bh_state *hs =
1351                 dcerpc_binding_handle_data(h,
1352                 struct irpc_bh_state);
1353
1354         hs->token = token;
1355 }