Merge 2610c05b5b95cc7036b3d6dfb894c6cfbdb68483 as Samba-4.0alpha16
[kai/samba-autobuild/.git] / source4 / lib / messaging / messaging.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    Samba internal messaging functions
5
6    Copyright (C) Andrew Tridgell 2004
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 3 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 */
21
22 #include "includes.h"
23 #include "lib/events/events.h"
24 #include "system/filesys.h"
25 #include "messaging/messaging.h"
26 #include "../lib/util/dlinklist.h"
27 #include "lib/socket/socket.h"
28 #include "librpc/gen_ndr/ndr_irpc.h"
29 #include "lib/messaging/irpc.h"
30 #include "lib/util/tdb_wrap.h"
31 #include "../lib/util/unix_privs.h"
32 #include "librpc/rpc/dcerpc.h"
33 #include "../lib/tdb_compat/tdb_compat.h"
34 #include "../lib/util/util_tdb.h"
35 #include "cluster/cluster.h"
36 #include "../lib/util/tevent_ntstatus.h"
37
38 /* change the message version with any incompatible changes in the protocol */
39 #define IMESSAGING_VERSION 1
40
41 /*
42   a pending irpc call
43 */
44 struct irpc_request {
45         struct imessaging_context *msg_ctx;
46         int callid;
47         struct {
48                 void (*handler)(struct irpc_request *irpc, struct irpc_message *m);
49                 void *private_data;
50         } incoming;
51 };
52
53 struct imessaging_context {
54         struct server_id server_id;
55         struct socket_context *sock;
56         const char *base_path;
57         const char *path;
58         struct dispatch_fn **dispatch;
59         uint32_t num_types;
60         struct idr_context *dispatch_tree;
61         struct imessaging_rec *pending;
62         struct imessaging_rec *retry_queue;
63         struct irpc_list *irpc;
64         struct idr_context *idr;
65         const char **names;
66         struct timeval start_time;
67         struct tevent_timer *retry_te;
68         struct {
69                 struct tevent_context *ev;
70                 struct tevent_fd *fde;
71         } event;
72 };
73
74 /* we have a linked list of dispatch handlers for each msg_type that
75    this messaging server can deal with */
76 struct dispatch_fn {
77         struct dispatch_fn *next, *prev;
78         uint32_t msg_type;
79         void *private_data;
80         msg_callback_t fn;
81 };
82
83 /* an individual message */
84 struct imessaging_rec {
85         struct imessaging_rec *next, *prev;
86         struct imessaging_context *msg;
87         const char *path;
88
89         struct imessaging_header {
90                 uint32_t version;
91                 uint32_t msg_type;
92                 struct server_id from;
93                 struct server_id to;
94                 uint32_t length;
95         } *header;
96
97         DATA_BLOB packet;
98         uint32_t retries;
99 };
100
101
102 static void irpc_handler(struct imessaging_context *, void *,
103                          uint32_t, struct server_id, DATA_BLOB *);
104
105
106 /*
107  A useful function for testing the message system.
108 */
109 static void ping_message(struct imessaging_context *msg, void *private_data,
110                          uint32_t msg_type, struct server_id src, DATA_BLOB *data)
111 {
112         char *task_id = server_id_str(NULL, &src);
113         DEBUG(1,("INFO: Received PING message from server %s [%.*s]\n",
114                  task_id, (int)data->length,
115                  data->data?(const char *)data->data:""));
116         talloc_free(task_id);
117         imessaging_send(msg, src, MSG_PONG, data);
118 }
119
120 /*
121   return uptime of messaging server via irpc
122 */
123 static NTSTATUS irpc_uptime(struct irpc_message *msg, 
124                             struct irpc_uptime *r)
125 {
126         struct imessaging_context *ctx = talloc_get_type(msg->private_data, struct imessaging_context);
127         *r->out.start_time = timeval_to_nttime(&ctx->start_time);
128         return NT_STATUS_OK;
129 }
130
131 /* 
132    return the path to a messaging socket
133 */
134 static char *imessaging_path(struct imessaging_context *msg, struct server_id server_id)
135 {
136         TALLOC_CTX *tmp_ctx = talloc_new(msg);
137         const char *id = server_id_str(tmp_ctx, &server_id);
138         char *s;
139         if (id == NULL) {
140                 return NULL;
141         }
142         s = talloc_asprintf(msg, "%s/msg.%s", msg->base_path, id);
143         talloc_steal(s, tmp_ctx);
144         return s;
145 }
146
147 /*
148   dispatch a fully received message
149
150   note that this deliberately can match more than one message handler
151   per message. That allows a single messasging context to register
152   (for example) a debug handler for more than one piece of code
153 */
154 static void imessaging_dispatch(struct imessaging_context *msg, struct imessaging_rec *rec)
155 {
156         struct dispatch_fn *d, *next;
157
158         /* temporary IDs use an idtree, the rest use a array of pointers */
159         if (rec->header->msg_type >= MSG_TMP_BASE) {
160                 d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 
161                                                    rec->header->msg_type);
162         } else if (rec->header->msg_type < msg->num_types) {
163                 d = msg->dispatch[rec->header->msg_type];
164         } else {
165                 d = NULL;
166         }
167
168         for (; d; d = next) {
169                 DATA_BLOB data;
170                 next = d->next;
171                 data.data = rec->packet.data + sizeof(*rec->header);
172                 data.length = rec->header->length;
173                 d->fn(msg, d->private_data, d->msg_type, rec->header->from, &data);
174         }
175         rec->header->length = 0;
176 }
177
178 /*
179   handler for messages that arrive from other nodes in the cluster
180 */
181 static void cluster_message_handler(struct imessaging_context *msg, DATA_BLOB packet)
182 {
183         struct imessaging_rec *rec;
184
185         rec = talloc(msg, struct imessaging_rec);
186         if (rec == NULL) {
187                 smb_panic("Unable to allocate imessaging_rec");
188         }
189
190         rec->msg           = msg;
191         rec->path          = msg->path;
192         rec->header        = (struct imessaging_header *)packet.data;
193         rec->packet        = packet;
194         rec->retries       = 0;
195
196         if (packet.length != sizeof(*rec->header) + rec->header->length) {
197                 DEBUG(0,("messaging: bad message header size %d should be %d\n", 
198                          rec->header->length, (int)(packet.length - sizeof(*rec->header))));
199                 talloc_free(rec);
200                 return;
201         }
202
203         imessaging_dispatch(msg, rec);
204         talloc_free(rec);
205 }
206
207
208
209 /*
210   try to send the message
211 */
212 static NTSTATUS try_send(struct imessaging_rec *rec)
213 {
214         struct imessaging_context *msg = rec->msg;
215         size_t nsent;
216         void *priv;
217         NTSTATUS status;
218         struct socket_address *path;
219
220         /* rec->path is the path of the *other* socket, where we want
221          * this to end up */
222         path = socket_address_from_strings(msg, msg->sock->backend_name, 
223                                            rec->path, 0);
224         if (!path) {
225                 return NT_STATUS_NO_MEMORY;
226         }
227
228         /* we send with privileges so messages work from any context */
229         priv = root_privileges();
230         status = socket_sendto(msg->sock, &rec->packet, &nsent, path);
231         talloc_free(path);
232         talloc_free(priv);
233
234         return status;
235 }
236
237 /*
238   retry backed off messages
239 */
240 static void msg_retry_timer(struct tevent_context *ev, struct tevent_timer *te, 
241                             struct timeval t, void *private_data)
242 {
243         struct imessaging_context *msg = talloc_get_type(private_data,
244                                                         struct imessaging_context);
245         msg->retry_te = NULL;
246
247         /* put the messages back on the main queue */
248         while (msg->retry_queue) {
249                 struct imessaging_rec *rec = msg->retry_queue;
250                 DLIST_REMOVE(msg->retry_queue, rec);
251                 DLIST_ADD_END(msg->pending, rec, struct imessaging_rec *);
252         }
253
254         EVENT_FD_WRITEABLE(msg->event.fde);     
255 }
256
257 /*
258   handle a socket write event
259 */
260 static void imessaging_send_handler(struct imessaging_context *msg)
261 {
262         while (msg->pending) {
263                 struct imessaging_rec *rec = msg->pending;
264                 NTSTATUS status;
265                 status = try_send(rec);
266                 if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
267                         rec->retries++;
268                         if (rec->retries > 3) {
269                                 /* we're getting continuous write errors -
270                                    backoff this record */
271                                 DLIST_REMOVE(msg->pending, rec);
272                                 DLIST_ADD_END(msg->retry_queue, rec, 
273                                               struct imessaging_rec *);
274                                 if (msg->retry_te == NULL) {
275                                         msg->retry_te = 
276                                                 event_add_timed(msg->event.ev, msg, 
277                                                                 timeval_current_ofs(1, 0), 
278                                                                 msg_retry_timer, msg);
279                                 }
280                         }
281                         break;
282                 }
283                 rec->retries = 0;
284                 if (!NT_STATUS_IS_OK(status)) {
285                         TALLOC_CTX *tmp_ctx = talloc_new(msg);
286                         DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n", 
287                                  server_id_str(tmp_ctx, &rec->header->from),
288                                  server_id_str(tmp_ctx, &rec->header->to),
289                                  rec->header->msg_type, 
290                                  nt_errstr(status)));
291                         talloc_free(tmp_ctx);
292                 }
293                 DLIST_REMOVE(msg->pending, rec);
294                 talloc_free(rec);
295         }
296         if (msg->pending == NULL) {
297                 EVENT_FD_NOT_WRITEABLE(msg->event.fde);
298         }
299 }
300
301 /*
302   handle a new incoming packet
303 */
304 static void imessaging_recv_handler(struct imessaging_context *msg)
305 {
306         struct imessaging_rec *rec;
307         NTSTATUS status;
308         DATA_BLOB packet;
309         size_t msize;
310
311         /* see how many bytes are in the next packet */
312         status = socket_pending(msg->sock, &msize);
313         if (!NT_STATUS_IS_OK(status)) {
314                 DEBUG(0,("socket_pending failed in messaging - %s\n", 
315                          nt_errstr(status)));
316                 return;
317         }
318         
319         packet = data_blob_talloc(msg, NULL, msize);
320         if (packet.data == NULL) {
321                 /* assume this is temporary and retry */
322                 return;
323         }
324             
325         status = socket_recv(msg->sock, packet.data, msize, &msize);
326         if (!NT_STATUS_IS_OK(status)) {
327                 data_blob_free(&packet);
328                 return;
329         }
330
331         if (msize < sizeof(*rec->header)) {
332                 DEBUG(0,("messaging: bad message of size %d\n", (int)msize));
333                 data_blob_free(&packet);
334                 return;
335         }
336
337         rec = talloc(msg, struct imessaging_rec);
338         if (rec == NULL) {
339                 smb_panic("Unable to allocate imessaging_rec");
340         }
341
342         talloc_steal(rec, packet.data);
343         rec->msg           = msg;
344         rec->path          = msg->path;
345         rec->header        = (struct imessaging_header *)packet.data;
346         rec->packet        = packet;
347         rec->retries       = 0;
348
349         if (msize != sizeof(*rec->header) + rec->header->length) {
350                 DEBUG(0,("messaging: bad message header size %d should be %d\n", 
351                          rec->header->length, (int)(msize - sizeof(*rec->header))));
352                 talloc_free(rec);
353                 return;
354         }
355
356         imessaging_dispatch(msg, rec);
357         talloc_free(rec);
358 }
359
360
361 /*
362   handle a socket event
363 */
364 static void imessaging_handler(struct tevent_context *ev, struct tevent_fd *fde,
365                               uint16_t flags, void *private_data)
366 {
367         struct imessaging_context *msg = talloc_get_type(private_data,
368                                                         struct imessaging_context);
369         if (flags & EVENT_FD_WRITE) {
370                 imessaging_send_handler(msg);
371         }
372         if (flags & EVENT_FD_READ) {
373                 imessaging_recv_handler(msg);
374         }
375 }
376
377
378 /*
379   Register a dispatch function for a particular message type.
380 */
381 NTSTATUS imessaging_register(struct imessaging_context *msg, void *private_data,
382                             uint32_t msg_type, msg_callback_t fn)
383 {
384         struct dispatch_fn *d;
385
386         /* possibly expand dispatch array */
387         if (msg_type >= msg->num_types) {
388                 struct dispatch_fn **dp;
389                 int i;
390                 dp = talloc_realloc(msg, msg->dispatch, struct dispatch_fn *, msg_type+1);
391                 NT_STATUS_HAVE_NO_MEMORY(dp);
392                 msg->dispatch = dp;
393                 for (i=msg->num_types;i<=msg_type;i++) {
394                         msg->dispatch[i] = NULL;
395                 }
396                 msg->num_types = msg_type+1;
397         }
398
399         d = talloc_zero(msg->dispatch, struct dispatch_fn);
400         NT_STATUS_HAVE_NO_MEMORY(d);
401         d->msg_type = msg_type;
402         d->private_data = private_data;
403         d->fn = fn;
404
405         DLIST_ADD(msg->dispatch[msg_type], d);
406
407         return NT_STATUS_OK;
408 }
409
410 /*
411   register a temporary message handler. The msg_type is allocated
412   above MSG_TMP_BASE
413 */
414 NTSTATUS imessaging_register_tmp(struct imessaging_context *msg, void *private_data,
415                                 msg_callback_t fn, uint32_t *msg_type)
416 {
417         struct dispatch_fn *d;
418         int id;
419
420         d = talloc_zero(msg->dispatch, struct dispatch_fn);
421         NT_STATUS_HAVE_NO_MEMORY(d);
422         d->private_data = private_data;
423         d->fn = fn;
424
425         id = idr_get_new_above(msg->dispatch_tree, d, MSG_TMP_BASE, UINT16_MAX);
426         if (id == -1) {
427                 talloc_free(d);
428                 return NT_STATUS_TOO_MANY_CONTEXT_IDS;
429         }
430
431         d->msg_type = (uint32_t)id;
432         (*msg_type) = d->msg_type;
433
434         return NT_STATUS_OK;
435 }
436
437 /*
438   De-register the function for a particular message type.
439 */
440 void imessaging_deregister(struct imessaging_context *msg, uint32_t msg_type, void *private_data)
441 {
442         struct dispatch_fn *d, *next;
443
444         if (msg_type >= msg->num_types) {
445                 d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 
446                                                    msg_type);
447                 if (!d) return;
448                 idr_remove(msg->dispatch_tree, msg_type);
449                 talloc_free(d);
450                 return;
451         }
452
453         for (d = msg->dispatch[msg_type]; d; d = next) {
454                 next = d->next;
455                 if (d->private_data == private_data) {
456                         DLIST_REMOVE(msg->dispatch[msg_type], d);
457                         talloc_free(d);
458                 }
459         }
460 }
461
462 /*
463   Send a message to a particular server
464 */
465 NTSTATUS imessaging_send(struct imessaging_context *msg, struct server_id server,
466                         uint32_t msg_type, const DATA_BLOB *data)
467 {
468         struct imessaging_rec *rec;
469         NTSTATUS status;
470         size_t dlength = data?data->length:0;
471
472         rec = talloc(msg, struct imessaging_rec);
473         if (rec == NULL) {
474                 return NT_STATUS_NO_MEMORY;
475         }
476
477         rec->packet = data_blob_talloc(rec, NULL, sizeof(*rec->header) + dlength);
478         if (rec->packet.data == NULL) {
479                 talloc_free(rec);
480                 return NT_STATUS_NO_MEMORY;
481         }
482
483         rec->retries       = 0;
484         rec->msg              = msg;
485         rec->header           = (struct imessaging_header *)rec->packet.data;
486         /* zero padding */
487         ZERO_STRUCTP(rec->header);
488         rec->header->version  = IMESSAGING_VERSION;
489         rec->header->msg_type = msg_type;
490         rec->header->from     = msg->server_id;
491         rec->header->to       = server;
492         rec->header->length   = dlength;
493         if (dlength != 0) {
494                 memcpy(rec->packet.data + sizeof(*rec->header), 
495                        data->data, dlength);
496         }
497
498         if (!cluster_node_equal(&msg->server_id, &server)) {
499                 /* the destination is on another node - dispatch via
500                    the cluster layer */
501                 status = cluster_message_send(server, &rec->packet);
502                 talloc_free(rec);
503                 return status;
504         }
505
506         rec->path = imessaging_path(msg, server);
507         talloc_steal(rec, rec->path);
508
509         if (msg->pending != NULL) {
510                 status = STATUS_MORE_ENTRIES;
511         } else {
512                 status = try_send(rec);
513         }
514
515         if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
516                 if (msg->pending == NULL) {
517                         EVENT_FD_WRITEABLE(msg->event.fde);
518                 }
519                 DLIST_ADD_END(msg->pending, rec, struct imessaging_rec *);
520                 return NT_STATUS_OK;
521         }
522
523         talloc_free(rec);
524
525         return status;
526 }
527
528 /*
529   Send a message to a particular server, with the message containing a single pointer
530 */
531 NTSTATUS imessaging_send_ptr(struct imessaging_context *msg, struct server_id server,
532                             uint32_t msg_type, void *ptr)
533 {
534         DATA_BLOB blob;
535
536         blob.data = (uint8_t *)&ptr;
537         blob.length = sizeof(void *);
538
539         return imessaging_send(msg, server, msg_type, &blob);
540 }
541
542
543 /*
544   destroy the messaging context
545 */
546 static int imessaging_destructor(struct imessaging_context *msg)
547 {
548         unlink(msg->path);
549         while (msg->names && msg->names[0]) {
550                 irpc_remove_name(msg, msg->names[0]);
551         }
552         return 0;
553 }
554
555 /*
556   create the listening socket and setup the dispatcher
557 */
558 struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
559                                          const char *dir,
560                                          struct server_id server_id, 
561                                          struct tevent_context *ev)
562 {
563         struct imessaging_context *msg;
564         NTSTATUS status;
565         struct socket_address *path;
566
567         if (ev == NULL) {
568                 return NULL;
569         }
570
571         msg = talloc_zero(mem_ctx, struct imessaging_context);
572         if (msg == NULL) {
573                 return NULL;
574         }
575
576         /* setup a handler for messages from other cluster nodes, if appropriate */
577         status = cluster_message_init(msg, server_id, cluster_message_handler);
578         if (!NT_STATUS_IS_OK(status)) {
579                 talloc_free(msg);
580                 return NULL;
581         }
582
583         /* create the messaging directory if needed */
584         mkdir(dir, 0700);
585
586         msg->base_path     = talloc_reference(msg, dir);
587         msg->path          = imessaging_path(msg, server_id);
588         msg->server_id     = server_id;
589         msg->idr           = idr_init(msg);
590         msg->dispatch_tree = idr_init(msg);
591         msg->start_time    = timeval_current();
592
593         status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0);
594         if (!NT_STATUS_IS_OK(status)) {
595                 talloc_free(msg);
596                 return NULL;
597         }
598
599         /* by stealing here we ensure that the socket is cleaned up (and even 
600            deleted) on exit */
601         talloc_steal(msg, msg->sock);
602
603         path = socket_address_from_strings(msg, msg->sock->backend_name, 
604                                            msg->path, 0);
605         if (!path) {
606                 talloc_free(msg);
607                 return NULL;
608         }
609
610         status = socket_listen(msg->sock, path, 50, 0);
611         if (!NT_STATUS_IS_OK(status)) {
612                 DEBUG(0,("Unable to setup messaging listener for '%s':%s\n", msg->path, nt_errstr(status)));
613                 talloc_free(msg);
614                 return NULL;
615         }
616
617         /* it needs to be non blocking for sends */
618         set_blocking(socket_get_fd(msg->sock), false);
619
620         msg->event.ev   = ev;
621         msg->event.fde  = event_add_fd(ev, msg, socket_get_fd(msg->sock), 
622                                        EVENT_FD_READ, imessaging_handler, msg);
623         tevent_fd_set_auto_close(msg->event.fde);
624
625         talloc_set_destructor(msg, imessaging_destructor);
626         
627         imessaging_register(msg, NULL, MSG_PING, ping_message);
628         imessaging_register(msg, NULL, MSG_IRPC, irpc_handler);
629         IRPC_REGISTER(msg, irpc, IRPC_UPTIME, irpc_uptime, msg);
630
631         return msg;
632 }
633
634 /* 
635    A hack, for the short term until we get 'client only' messaging in place 
636 */
637 struct imessaging_context *imessaging_client_init(TALLOC_CTX *mem_ctx,
638                                                 const char *dir,
639                                                 struct tevent_context *ev)
640 {
641         struct server_id id;
642         ZERO_STRUCT(id);
643         id.pid = random() % 0x10000000;
644         return imessaging_init(mem_ctx, dir, id, ev);
645 }
646 /*
647   a list of registered irpc server functions
648 */
649 struct irpc_list {
650         struct irpc_list *next, *prev;
651         struct GUID uuid;
652         const struct ndr_interface_table *table;
653         int callnum;
654         irpc_function_t fn;
655         void *private_data;
656 };
657
658
659 /*
660   register a irpc server function
661 */
662 NTSTATUS irpc_register(struct imessaging_context *msg_ctx,
663                        const struct ndr_interface_table *table, 
664                        int callnum, irpc_function_t fn, void *private_data)
665 {
666         struct irpc_list *irpc;
667
668         /* override an existing handler, if any */
669         for (irpc=msg_ctx->irpc; irpc; irpc=irpc->next) {
670                 if (irpc->table == table && irpc->callnum == callnum) {
671                         break;
672                 }
673         }
674         if (irpc == NULL) {
675                 irpc = talloc(msg_ctx, struct irpc_list);
676                 NT_STATUS_HAVE_NO_MEMORY(irpc);
677                 DLIST_ADD(msg_ctx->irpc, irpc);
678         }
679
680         irpc->table   = table;
681         irpc->callnum = callnum;
682         irpc->fn      = fn;
683         irpc->private_data = private_data;
684         irpc->uuid = irpc->table->syntax_id.uuid;
685
686         return NT_STATUS_OK;
687 }
688
689
690 /*
691   handle an incoming irpc reply message
692 */
693 static void irpc_handler_reply(struct imessaging_context *msg_ctx, struct irpc_message *m)
694 {
695         struct irpc_request *irpc;
696
697         irpc = (struct irpc_request *)idr_find(msg_ctx->idr, m->header.callid);
698         if (irpc == NULL) return;
699
700         irpc->incoming.handler(irpc, m);
701 }
702
703 /*
704   send a irpc reply
705 */
706 NTSTATUS irpc_send_reply(struct irpc_message *m, NTSTATUS status)
707 {
708         struct ndr_push *push;
709         DATA_BLOB packet;
710         enum ndr_err_code ndr_err;
711
712         m->header.status = status;
713
714         /* setup the reply */
715         push = ndr_push_init_ctx(m->ndr);
716         if (push == NULL) {
717                 status = NT_STATUS_NO_MEMORY;
718                 goto failed;
719         }
720
721         m->header.flags |= IRPC_FLAG_REPLY;
722         m->header.creds.token= NULL;
723
724         /* construct the packet */
725         ndr_err = ndr_push_irpc_header(push, NDR_SCALARS|NDR_BUFFERS, &m->header);
726         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
727                 status = ndr_map_error2ntstatus(ndr_err);
728                 goto failed;
729         }
730
731         ndr_err = m->irpc->table->calls[m->irpc->callnum].ndr_push(push, NDR_OUT, m->data);
732         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
733                 status = ndr_map_error2ntstatus(ndr_err);
734                 goto failed;
735         }
736
737         /* send the reply message */
738         packet = ndr_push_blob(push);
739         status = imessaging_send(m->msg_ctx, m->from, MSG_IRPC, &packet);
740         if (!NT_STATUS_IS_OK(status)) goto failed;
741
742 failed:
743         talloc_free(m);
744         return status;
745 }
746
747 /*
748   handle an incoming irpc request message
749 */
750 static void irpc_handler_request(struct imessaging_context *msg_ctx,
751                                  struct irpc_message *m)
752 {
753         struct irpc_list *i;
754         void *r;
755         enum ndr_err_code ndr_err;
756
757         for (i=msg_ctx->irpc; i; i=i->next) {
758                 if (GUID_equal(&i->uuid, &m->header.uuid) &&
759                     i->table->syntax_id.if_version == m->header.if_version &&
760                     i->callnum == m->header.callnum) {
761                         break;
762                 }
763         }
764
765         if (i == NULL) {
766                 /* no registered handler for this message */
767                 talloc_free(m);
768                 return;
769         }
770
771         /* allocate space for the structure */
772         r = talloc_zero_size(m->ndr, i->table->calls[m->header.callnum].struct_size);
773         if (r == NULL) goto failed;
774
775         m->ndr->flags |= LIBNDR_FLAG_REF_ALLOC;
776
777         /* parse the request data */
778         ndr_err = i->table->calls[i->callnum].ndr_pull(m->ndr, NDR_IN, r);
779         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
780
781         /* make the call */
782         m->private_data= i->private_data;
783         m->defer_reply = false;
784         m->no_reply    = false;
785         m->msg_ctx     = msg_ctx;
786         m->irpc        = i;
787         m->data        = r;
788         m->ev          = msg_ctx->event.ev;
789
790         m->header.status = i->fn(m, r);
791
792         if (m->no_reply) {
793                 /* the server function won't ever be replying to this request */
794                 talloc_free(m);
795                 return;
796         }
797
798         if (m->defer_reply) {
799                 /* the server function has asked to defer the reply to later */
800                 talloc_steal(msg_ctx, m);
801                 return;
802         }
803
804         irpc_send_reply(m, m->header.status);
805         return;
806
807 failed:
808         talloc_free(m);
809 }
810
811 /*
812   handle an incoming irpc message
813 */
814 static void irpc_handler(struct imessaging_context *msg_ctx, void *private_data,
815                          uint32_t msg_type, struct server_id src, DATA_BLOB *packet)
816 {
817         struct irpc_message *m;
818         enum ndr_err_code ndr_err;
819
820         m = talloc(msg_ctx, struct irpc_message);
821         if (m == NULL) goto failed;
822
823         m->from = src;
824
825         m->ndr = ndr_pull_init_blob(packet, m);
826         if (m->ndr == NULL) goto failed;
827
828         m->ndr->flags |= LIBNDR_FLAG_REF_ALLOC;
829
830         ndr_err = ndr_pull_irpc_header(m->ndr, NDR_BUFFERS|NDR_SCALARS, &m->header);
831         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
832
833         if (m->header.flags & IRPC_FLAG_REPLY) {
834                 irpc_handler_reply(msg_ctx, m);
835         } else {
836                 irpc_handler_request(msg_ctx, m);
837         }
838         return;
839
840 failed:
841         talloc_free(m);
842 }
843
844
845 /*
846   destroy a irpc request
847 */
848 static int irpc_destructor(struct irpc_request *irpc)
849 {
850         if (irpc->callid != -1) {
851                 idr_remove(irpc->msg_ctx->idr, irpc->callid);
852                 irpc->callid = -1;
853         }
854
855         return 0;
856 }
857
858 /*
859   open the naming database
860 */
861 static struct tdb_wrap *irpc_namedb_open(struct imessaging_context *msg_ctx)
862 {
863         struct tdb_wrap *t;
864         char *path = talloc_asprintf(msg_ctx, "%s/names.tdb", msg_ctx->base_path);
865         if (path == NULL) {
866                 return NULL;
867         }
868         t = tdb_wrap_open(msg_ctx, path, 0, 0, O_RDWR|O_CREAT, 0660);
869         talloc_free(path);
870         return t;
871 }
872         
873
874 /*
875   add a string name that this irpc server can be called on
876 */
877 NTSTATUS irpc_add_name(struct imessaging_context *msg_ctx, const char *name)
878 {
879         struct tdb_wrap *t;
880         TDB_DATA rec;
881         int count;
882         NTSTATUS status = NT_STATUS_OK;
883
884         t = irpc_namedb_open(msg_ctx);
885         NT_STATUS_HAVE_NO_MEMORY(t);
886
887         if (tdb_lock_bystring(t->tdb, name) != 0) {
888                 talloc_free(t);
889                 return NT_STATUS_LOCK_NOT_GRANTED;
890         }
891         rec = tdb_fetch_bystring(t->tdb, name);
892         count = rec.dsize / sizeof(struct server_id);
893         rec.dptr = (unsigned char *)realloc_p(rec.dptr, struct server_id, count+1);
894         rec.dsize += sizeof(struct server_id);
895         if (rec.dptr == NULL) {
896                 tdb_unlock_bystring(t->tdb, name);
897                 talloc_free(t);
898                 return NT_STATUS_NO_MEMORY;
899         }
900         ((struct server_id *)rec.dptr)[count] = msg_ctx->server_id;
901         if (tdb_store_bystring(t->tdb, name, rec, 0) != 0) {
902                 status = NT_STATUS_INTERNAL_ERROR;
903         }
904         free(rec.dptr);
905         tdb_unlock_bystring(t->tdb, name);
906         talloc_free(t);
907
908         msg_ctx->names = str_list_add(msg_ctx->names, name);
909         talloc_steal(msg_ctx, msg_ctx->names);
910
911         return status;
912 }
913
914 /*
915   return a list of server ids for a server name
916 */
917 struct server_id *irpc_servers_byname(struct imessaging_context *msg_ctx,
918                                       TALLOC_CTX *mem_ctx,
919                                       const char *name)
920 {
921         struct tdb_wrap *t;
922         TDB_DATA rec;
923         int count, i;
924         struct server_id *ret;
925
926         t = irpc_namedb_open(msg_ctx);
927         if (t == NULL) {
928                 return NULL;
929         }
930
931         if (tdb_lock_bystring(t->tdb, name) != 0) {
932                 talloc_free(t);
933                 return NULL;
934         }
935         rec = tdb_fetch_bystring(t->tdb, name);
936         if (rec.dptr == NULL) {
937                 tdb_unlock_bystring(t->tdb, name);
938                 talloc_free(t);
939                 return NULL;
940         }
941         count = rec.dsize / sizeof(struct server_id);
942         ret = talloc_array(mem_ctx, struct server_id, count+1);
943         if (ret == NULL) {
944                 tdb_unlock_bystring(t->tdb, name);
945                 talloc_free(t);
946                 return NULL;
947         }
948         for (i=0;i<count;i++) {
949                 ret[i] = ((struct server_id *)rec.dptr)[i];
950         }
951         ret[i] = cluster_id(0, 0);
952         free(rec.dptr);
953         tdb_unlock_bystring(t->tdb, name);
954         talloc_free(t);
955
956         return ret;
957 }
958
959 /*
960   remove a name from a messaging context
961 */
962 void irpc_remove_name(struct imessaging_context *msg_ctx, const char *name)
963 {
964         struct tdb_wrap *t;
965         TDB_DATA rec;
966         int count, i;
967         struct server_id *ids;
968
969         str_list_remove(msg_ctx->names, name);
970
971         t = irpc_namedb_open(msg_ctx);
972         if (t == NULL) {
973                 return;
974         }
975
976         if (tdb_lock_bystring(t->tdb, name) != 0) {
977                 talloc_free(t);
978                 return;
979         }
980         rec = tdb_fetch_bystring(t->tdb, name);
981         if (rec.dptr == NULL) {
982                 tdb_unlock_bystring(t->tdb, name);
983                 talloc_free(t);
984                 return;
985         }
986         count = rec.dsize / sizeof(struct server_id);
987         if (count == 0) {
988                 free(rec.dptr);
989                 tdb_unlock_bystring(t->tdb, name);
990                 talloc_free(t);
991                 return;
992         }
993         ids = (struct server_id *)rec.dptr;
994         for (i=0;i<count;i++) {
995                 if (cluster_id_equal(&ids[i], &msg_ctx->server_id)) {
996                         if (i < count-1) {
997                                 memmove(ids+i, ids+i+1, 
998                                         sizeof(struct server_id) * (count-(i+1)));
999                         }
1000                         rec.dsize -= sizeof(struct server_id);
1001                         break;
1002                 }
1003         }
1004         tdb_store_bystring(t->tdb, name, rec, 0);
1005         free(rec.dptr);
1006         tdb_unlock_bystring(t->tdb, name);
1007         talloc_free(t);
1008 }
1009
1010 struct server_id imessaging_get_server_id(struct imessaging_context *msg_ctx)
1011 {
1012         return msg_ctx->server_id;
1013 }
1014
1015 struct irpc_bh_state {
1016         struct imessaging_context *msg_ctx;
1017         struct server_id server_id;
1018         const struct ndr_interface_table *table;
1019         uint32_t timeout;
1020         struct security_token *token;
1021 };
1022
1023 static bool irpc_bh_is_connected(struct dcerpc_binding_handle *h)
1024 {
1025         struct irpc_bh_state *hs = dcerpc_binding_handle_data(h,
1026                                    struct irpc_bh_state);
1027
1028         if (!hs->msg_ctx) {
1029                 return false;
1030         }
1031
1032         return true;
1033 }
1034
1035 static uint32_t irpc_bh_set_timeout(struct dcerpc_binding_handle *h,
1036                                     uint32_t timeout)
1037 {
1038         struct irpc_bh_state *hs = dcerpc_binding_handle_data(h,
1039                                    struct irpc_bh_state);
1040         uint32_t old = hs->timeout;
1041
1042         hs->timeout = timeout;
1043
1044         return old;
1045 }
1046
1047 struct irpc_bh_raw_call_state {
1048         struct irpc_request *irpc;
1049         uint32_t opnum;
1050         DATA_BLOB in_data;
1051         DATA_BLOB in_packet;
1052         DATA_BLOB out_data;
1053 };
1054
1055 static void irpc_bh_raw_call_incoming_handler(struct irpc_request *irpc,
1056                                               struct irpc_message *m);
1057
1058 static struct tevent_req *irpc_bh_raw_call_send(TALLOC_CTX *mem_ctx,
1059                                                 struct tevent_context *ev,
1060                                                 struct dcerpc_binding_handle *h,
1061                                                 const struct GUID *object,
1062                                                 uint32_t opnum,
1063                                                 uint32_t in_flags,
1064                                                 const uint8_t *in_data,
1065                                                 size_t in_length)
1066 {
1067         struct irpc_bh_state *hs =
1068                 dcerpc_binding_handle_data(h,
1069                 struct irpc_bh_state);
1070         struct tevent_req *req;
1071         struct irpc_bh_raw_call_state *state;
1072         bool ok;
1073         struct irpc_header header;
1074         struct ndr_push *ndr;
1075         NTSTATUS status;
1076         enum ndr_err_code ndr_err;
1077
1078         req = tevent_req_create(mem_ctx, &state,
1079                                 struct irpc_bh_raw_call_state);
1080         if (req == NULL) {
1081                 return NULL;
1082         }
1083         state->opnum = opnum;
1084         state->in_data.data = discard_const_p(uint8_t, in_data);
1085         state->in_data.length = in_length;
1086
1087         ok = irpc_bh_is_connected(h);
1088         if (!ok) {
1089                 tevent_req_nterror(req, NT_STATUS_INVALID_CONNECTION);
1090                 return tevent_req_post(req, ev);
1091         }
1092
1093         state->irpc = talloc_zero(state, struct irpc_request);
1094         if (tevent_req_nomem(state->irpc, req)) {
1095                 return tevent_req_post(req, ev);
1096         }
1097
1098         state->irpc->msg_ctx  = hs->msg_ctx;
1099         state->irpc->callid   = idr_get_new(hs->msg_ctx->idr,
1100                                             state->irpc, UINT16_MAX);
1101         if (state->irpc->callid == -1) {
1102                 tevent_req_nterror(req, NT_STATUS_INSUFFICIENT_RESOURCES);
1103                 return tevent_req_post(req, ev);
1104         }
1105         state->irpc->incoming.handler = irpc_bh_raw_call_incoming_handler;
1106         state->irpc->incoming.private_data = req;
1107
1108         talloc_set_destructor(state->irpc, irpc_destructor);
1109
1110         /* setup the header */
1111         header.uuid = hs->table->syntax_id.uuid;
1112
1113         header.if_version = hs->table->syntax_id.if_version;
1114         header.callid     = state->irpc->callid;
1115         header.callnum    = state->opnum;
1116         header.flags      = 0;
1117         header.status     = NT_STATUS_OK;
1118         header.creds.token= hs->token;
1119
1120         /* construct the irpc packet */
1121         ndr = ndr_push_init_ctx(state->irpc);
1122         if (tevent_req_nomem(ndr, req)) {
1123                 return tevent_req_post(req, ev);
1124         }
1125
1126         ndr_err = ndr_push_irpc_header(ndr, NDR_SCALARS|NDR_BUFFERS, &header);
1127         status = ndr_map_error2ntstatus(ndr_err);
1128         if (!NT_STATUS_IS_OK(status)) {
1129                 tevent_req_nterror(req, status);
1130                 return tevent_req_post(req, ev);
1131         }
1132
1133         ndr_err = ndr_push_bytes(ndr, in_data, in_length);
1134         status = ndr_map_error2ntstatus(ndr_err);
1135         if (!NT_STATUS_IS_OK(status)) {
1136                 tevent_req_nterror(req, status);
1137                 return tevent_req_post(req, ev);
1138         }
1139
1140         /* and send it */
1141         state->in_packet = ndr_push_blob(ndr);
1142         status = imessaging_send(hs->msg_ctx, hs->server_id,
1143                                 MSG_IRPC, &state->in_packet);
1144         if (!NT_STATUS_IS_OK(status)) {
1145                 tevent_req_nterror(req, status);
1146                 return tevent_req_post(req, ev);
1147         }
1148
1149         if (hs->timeout != IRPC_CALL_TIMEOUT_INF) {
1150                 /* set timeout-callback in case caller wants that */
1151                 ok = tevent_req_set_endtime(req, ev, timeval_current_ofs(hs->timeout, 0));
1152                 if (!ok) {
1153                         return tevent_req_post(req, ev);
1154                 }
1155         }
1156
1157         return req;
1158 }
1159
1160 static void irpc_bh_raw_call_incoming_handler(struct irpc_request *irpc,
1161                                               struct irpc_message *m)
1162 {
1163         struct tevent_req *req =
1164                 talloc_get_type_abort(irpc->incoming.private_data,
1165                 struct tevent_req);
1166         struct irpc_bh_raw_call_state *state =
1167                 tevent_req_data(req,
1168                 struct irpc_bh_raw_call_state);
1169
1170         talloc_steal(state, m);
1171
1172         if (!NT_STATUS_IS_OK(m->header.status)) {
1173                 tevent_req_nterror(req, m->header.status);
1174                 return;
1175         }
1176
1177         state->out_data = data_blob_talloc(state,
1178                 m->ndr->data + m->ndr->offset,
1179                 m->ndr->data_size - m->ndr->offset);
1180         if ((m->ndr->data_size - m->ndr->offset) > 0 && !state->out_data.data) {
1181                 tevent_req_oom(req);
1182                 return;
1183         }
1184
1185         tevent_req_done(req);
1186 }
1187
1188 static NTSTATUS irpc_bh_raw_call_recv(struct tevent_req *req,
1189                                         TALLOC_CTX *mem_ctx,
1190                                         uint8_t **out_data,
1191                                         size_t *out_length,
1192                                         uint32_t *out_flags)
1193 {
1194         struct irpc_bh_raw_call_state *state =
1195                 tevent_req_data(req,
1196                 struct irpc_bh_raw_call_state);
1197         NTSTATUS status;
1198
1199         if (tevent_req_is_nterror(req, &status)) {
1200                 tevent_req_received(req);
1201                 return status;
1202         }
1203
1204         *out_data = talloc_move(mem_ctx, &state->out_data.data);
1205         *out_length = state->out_data.length;
1206         *out_flags = 0;
1207         tevent_req_received(req);
1208         return NT_STATUS_OK;
1209 }
1210
1211 struct irpc_bh_disconnect_state {
1212         uint8_t _dummy;
1213 };
1214
1215 static struct tevent_req *irpc_bh_disconnect_send(TALLOC_CTX *mem_ctx,
1216                                                 struct tevent_context *ev,
1217                                                 struct dcerpc_binding_handle *h)
1218 {
1219         struct irpc_bh_state *hs = dcerpc_binding_handle_data(h,
1220                                      struct irpc_bh_state);
1221         struct tevent_req *req;
1222         struct irpc_bh_disconnect_state *state;
1223         bool ok;
1224
1225         req = tevent_req_create(mem_ctx, &state,
1226                                 struct irpc_bh_disconnect_state);
1227         if (req == NULL) {
1228                 return NULL;
1229         }
1230
1231         ok = irpc_bh_is_connected(h);
1232         if (!ok) {
1233                 tevent_req_nterror(req, NT_STATUS_INVALID_CONNECTION);
1234                 return tevent_req_post(req, ev);
1235         }
1236
1237         hs->msg_ctx = NULL;
1238
1239         tevent_req_done(req);
1240         return tevent_req_post(req, ev);
1241 }
1242
1243 static NTSTATUS irpc_bh_disconnect_recv(struct tevent_req *req)
1244 {
1245         NTSTATUS status;
1246
1247         if (tevent_req_is_nterror(req, &status)) {
1248                 tevent_req_received(req);
1249                 return status;
1250         }
1251
1252         tevent_req_received(req);
1253         return NT_STATUS_OK;
1254 }
1255
1256 static bool irpc_bh_ref_alloc(struct dcerpc_binding_handle *h)
1257 {
1258         return true;
1259 }
1260
1261 static const struct dcerpc_binding_handle_ops irpc_bh_ops = {
1262         .name                   = "wbint",
1263         .is_connected           = irpc_bh_is_connected,
1264         .set_timeout            = irpc_bh_set_timeout,
1265         .raw_call_send          = irpc_bh_raw_call_send,
1266         .raw_call_recv          = irpc_bh_raw_call_recv,
1267         .disconnect_send        = irpc_bh_disconnect_send,
1268         .disconnect_recv        = irpc_bh_disconnect_recv,
1269
1270         .ref_alloc              = irpc_bh_ref_alloc,
1271 };
1272
1273 /* initialise a irpc binding handle */
1274 struct dcerpc_binding_handle *irpc_binding_handle(TALLOC_CTX *mem_ctx,
1275                                         struct imessaging_context *msg_ctx,
1276                                         struct server_id server_id,
1277                                         const struct ndr_interface_table *table)
1278 {
1279         struct dcerpc_binding_handle *h;
1280         struct irpc_bh_state *hs;
1281
1282         h = dcerpc_binding_handle_create(mem_ctx,
1283                                          &irpc_bh_ops,
1284                                          NULL,
1285                                          table,
1286                                          &hs,
1287                                          struct irpc_bh_state,
1288                                          __location__);
1289         if (h == NULL) {
1290                 return NULL;
1291         }
1292         hs->msg_ctx = msg_ctx;
1293         hs->server_id = server_id;
1294         hs->table = table;
1295         hs->timeout = IRPC_CALL_TIMEOUT;
1296
1297         dcerpc_binding_handle_set_sync_ev(h, msg_ctx->event.ev);
1298
1299         return h;
1300 }
1301
1302 struct dcerpc_binding_handle *irpc_binding_handle_by_name(TALLOC_CTX *mem_ctx,
1303                                         struct imessaging_context *msg_ctx,
1304                                         const char *dest_task,
1305                                         const struct ndr_interface_table *table)
1306 {
1307         struct dcerpc_binding_handle *h;
1308         struct server_id *sids;
1309         struct server_id sid;
1310
1311         /* find the server task */
1312         sids = irpc_servers_byname(msg_ctx, mem_ctx, dest_task);
1313         if (sids == NULL) {
1314                 errno = EADDRNOTAVAIL;
1315                 return NULL;
1316         }
1317         if (sids[0].pid == 0) {
1318                 talloc_free(sids);
1319                 errno = EADDRNOTAVAIL;
1320                 return NULL;
1321         }
1322         sid = sids[0];
1323         talloc_free(sids);
1324
1325         h = irpc_binding_handle(mem_ctx, msg_ctx,
1326                                 sid, table);
1327         if (h == NULL) {
1328                 return NULL;
1329         }
1330
1331         return h;
1332 }
1333
1334 void irpc_binding_handle_add_security_token(struct dcerpc_binding_handle *h,
1335                                             struct security_token *token)
1336 {
1337         struct irpc_bh_state *hs =
1338                 dcerpc_binding_handle_data(h,
1339                 struct irpc_bh_state);
1340
1341         hs->token = token;
1342 }