s4:lib: use tevent_ fns names instead of legcay event_ ones
[kai/samba-autobuild/.git] / source4 / lib / messaging / messaging.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    Samba internal messaging functions
5
6    Copyright (C) Andrew Tridgell 2004
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 3 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 */
21
22 #include "includes.h"
23 #include "lib/events/events.h"
24 #include "system/filesys.h"
25 #include "messaging/messaging.h"
26 #include "../lib/util/dlinklist.h"
27 #include "lib/socket/socket.h"
28 #include "librpc/gen_ndr/ndr_irpc.h"
29 #include "lib/messaging/irpc.h"
30 #include "lib/util/tdb_wrap.h"
31 #include "../lib/util/unix_privs.h"
32 #include "librpc/rpc/dcerpc.h"
33 #include "../lib/tdb_compat/tdb_compat.h"
34 #include "../lib/util/util_tdb.h"
35 #include "cluster/cluster.h"
36 #include "../lib/util/tevent_ntstatus.h"
37
38 /* change the message version with any incompatible changes in the protocol */
39 #define IMESSAGING_VERSION 1
40
41 /*
42   a pending irpc call
43 */
44 struct irpc_request {
45         struct imessaging_context *msg_ctx;
46         int callid;
47         struct {
48                 void (*handler)(struct irpc_request *irpc, struct irpc_message *m);
49                 void *private_data;
50         } incoming;
51 };
52
53 struct imessaging_context {
54         struct server_id server_id;
55         struct socket_context *sock;
56         const char *base_path;
57         const char *path;
58         struct dispatch_fn **dispatch;
59         uint32_t num_types;
60         struct idr_context *dispatch_tree;
61         struct imessaging_rec *pending;
62         struct imessaging_rec *retry_queue;
63         struct irpc_list *irpc;
64         struct idr_context *idr;
65         const char **names;
66         struct timeval start_time;
67         struct tevent_timer *retry_te;
68         struct {
69                 struct tevent_context *ev;
70                 struct tevent_fd *fde;
71         } event;
72 };
73
74 /* we have a linked list of dispatch handlers for each msg_type that
75    this messaging server can deal with */
76 struct dispatch_fn {
77         struct dispatch_fn *next, *prev;
78         uint32_t msg_type;
79         void *private_data;
80         msg_callback_t fn;
81 };
82
83 /* an individual message */
84 struct imessaging_rec {
85         struct imessaging_rec *next, *prev;
86         struct imessaging_context *msg;
87         const char *path;
88
89         struct imessaging_header {
90                 uint32_t version;
91                 uint32_t msg_type;
92                 struct server_id from;
93                 struct server_id to;
94                 uint32_t length;
95         } *header;
96
97         DATA_BLOB packet;
98         uint32_t retries;
99 };
100
101
102 static void irpc_handler(struct imessaging_context *, void *,
103                          uint32_t, struct server_id, DATA_BLOB *);
104
105
106 /*
107  A useful function for testing the message system.
108 */
109 static void ping_message(struct imessaging_context *msg, void *private_data,
110                          uint32_t msg_type, struct server_id src, DATA_BLOB *data)
111 {
112         char *task_id = server_id_str(NULL, &src);
113         DEBUG(1,("INFO: Received PING message from server %s [%.*s]\n",
114                  task_id, (int)data->length,
115                  data->data?(const char *)data->data:""));
116         talloc_free(task_id);
117         imessaging_send(msg, src, MSG_PONG, data);
118 }
119
120 /*
121   return uptime of messaging server via irpc
122 */
123 static NTSTATUS irpc_uptime(struct irpc_message *msg, 
124                             struct irpc_uptime *r)
125 {
126         struct imessaging_context *ctx = talloc_get_type(msg->private_data, struct imessaging_context);
127         *r->out.start_time = timeval_to_nttime(&ctx->start_time);
128         return NT_STATUS_OK;
129 }
130
131 /* 
132    return the path to a messaging socket
133 */
134 static char *imessaging_path(struct imessaging_context *msg, struct server_id server_id)
135 {
136         TALLOC_CTX *tmp_ctx = talloc_new(msg);
137         const char *id = server_id_str(tmp_ctx, &server_id);
138         char *s;
139         if (id == NULL) {
140                 return NULL;
141         }
142         s = talloc_asprintf(msg, "%s/msg.%s", msg->base_path, id);
143         talloc_steal(s, tmp_ctx);
144         return s;
145 }
146
147 /*
148   dispatch a fully received message
149
150   note that this deliberately can match more than one message handler
151   per message. That allows a single messasging context to register
152   (for example) a debug handler for more than one piece of code
153 */
154 static void imessaging_dispatch(struct imessaging_context *msg, struct imessaging_rec *rec)
155 {
156         struct dispatch_fn *d, *next;
157
158         /* temporary IDs use an idtree, the rest use a array of pointers */
159         if (rec->header->msg_type >= MSG_TMP_BASE) {
160                 d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 
161                                                    rec->header->msg_type);
162         } else if (rec->header->msg_type < msg->num_types) {
163                 d = msg->dispatch[rec->header->msg_type];
164         } else {
165                 d = NULL;
166         }
167
168         for (; d; d = next) {
169                 DATA_BLOB data;
170                 next = d->next;
171                 data.data = rec->packet.data + sizeof(*rec->header);
172                 data.length = rec->header->length;
173                 d->fn(msg, d->private_data, d->msg_type, rec->header->from, &data);
174         }
175         rec->header->length = 0;
176 }
177
178 /*
179   handler for messages that arrive from other nodes in the cluster
180 */
181 static void cluster_message_handler(struct imessaging_context *msg, DATA_BLOB packet)
182 {
183         struct imessaging_rec *rec;
184
185         rec = talloc(msg, struct imessaging_rec);
186         if (rec == NULL) {
187                 smb_panic("Unable to allocate imessaging_rec");
188         }
189
190         rec->msg           = msg;
191         rec->path          = msg->path;
192         rec->header        = (struct imessaging_header *)packet.data;
193         rec->packet        = packet;
194         rec->retries       = 0;
195
196         if (packet.length != sizeof(*rec->header) + rec->header->length) {
197                 DEBUG(0,("messaging: bad message header size %d should be %d\n", 
198                          rec->header->length, (int)(packet.length - sizeof(*rec->header))));
199                 talloc_free(rec);
200                 return;
201         }
202
203         imessaging_dispatch(msg, rec);
204         talloc_free(rec);
205 }
206
207
208
209 /*
210   try to send the message
211 */
212 static NTSTATUS try_send(struct imessaging_rec *rec)
213 {
214         struct imessaging_context *msg = rec->msg;
215         size_t nsent;
216         void *priv;
217         NTSTATUS status;
218         struct socket_address *path;
219
220         /* rec->path is the path of the *other* socket, where we want
221          * this to end up */
222         path = socket_address_from_strings(msg, msg->sock->backend_name, 
223                                            rec->path, 0);
224         if (!path) {
225                 return NT_STATUS_NO_MEMORY;
226         }
227
228         /* we send with privileges so messages work from any context */
229         priv = root_privileges();
230         status = socket_sendto(msg->sock, &rec->packet, &nsent, path);
231         talloc_free(path);
232         talloc_free(priv);
233
234         return status;
235 }
236
237 /*
238   retry backed off messages
239 */
240 static void msg_retry_timer(struct tevent_context *ev, struct tevent_timer *te, 
241                             struct timeval t, void *private_data)
242 {
243         struct imessaging_context *msg = talloc_get_type(private_data,
244                                                         struct imessaging_context);
245         msg->retry_te = NULL;
246
247         /* put the messages back on the main queue */
248         while (msg->retry_queue) {
249                 struct imessaging_rec *rec = msg->retry_queue;
250                 DLIST_REMOVE(msg->retry_queue, rec);
251                 DLIST_ADD_END(msg->pending, rec, struct imessaging_rec *);
252         }
253
254         TEVENT_FD_WRITEABLE(msg->event.fde);
255 }
256
257 /*
258   handle a socket write event
259 */
260 static void imessaging_send_handler(struct imessaging_context *msg)
261 {
262         while (msg->pending) {
263                 struct imessaging_rec *rec = msg->pending;
264                 NTSTATUS status;
265                 status = try_send(rec);
266                 if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
267                         rec->retries++;
268                         if (rec->retries > 3) {
269                                 /* we're getting continuous write errors -
270                                    backoff this record */
271                                 DLIST_REMOVE(msg->pending, rec);
272                                 DLIST_ADD_END(msg->retry_queue, rec, 
273                                               struct imessaging_rec *);
274                                 if (msg->retry_te == NULL) {
275                                         msg->retry_te = 
276                                                 tevent_add_timer(msg->event.ev, msg,
277                                                                 timeval_current_ofs(1, 0), 
278                                                                 msg_retry_timer, msg);
279                                 }
280                         }
281                         break;
282                 }
283                 rec->retries = 0;
284                 if (!NT_STATUS_IS_OK(status)) {
285                         TALLOC_CTX *tmp_ctx = talloc_new(msg);
286                         DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n", 
287                                  server_id_str(tmp_ctx, &rec->header->from),
288                                  server_id_str(tmp_ctx, &rec->header->to),
289                                  rec->header->msg_type, 
290                                  nt_errstr(status)));
291                         talloc_free(tmp_ctx);
292                 }
293                 DLIST_REMOVE(msg->pending, rec);
294                 talloc_free(rec);
295         }
296         if (msg->pending == NULL) {
297                 TEVENT_FD_NOT_WRITEABLE(msg->event.fde);
298         }
299 }
300
301 /*
302   handle a new incoming packet
303 */
304 static void imessaging_recv_handler(struct imessaging_context *msg)
305 {
306         struct imessaging_rec *rec;
307         NTSTATUS status;
308         DATA_BLOB packet;
309         size_t msize;
310
311         /* see how many bytes are in the next packet */
312         status = socket_pending(msg->sock, &msize);
313         if (!NT_STATUS_IS_OK(status)) {
314                 DEBUG(0,("socket_pending failed in messaging - %s\n", 
315                          nt_errstr(status)));
316                 return;
317         }
318         
319         packet = data_blob_talloc(msg, NULL, msize);
320         if (packet.data == NULL) {
321                 /* assume this is temporary and retry */
322                 return;
323         }
324             
325         status = socket_recv(msg->sock, packet.data, msize, &msize);
326         if (!NT_STATUS_IS_OK(status)) {
327                 data_blob_free(&packet);
328                 return;
329         }
330
331         if (msize < sizeof(*rec->header)) {
332                 DEBUG(0,("messaging: bad message of size %d\n", (int)msize));
333                 data_blob_free(&packet);
334                 return;
335         }
336
337         rec = talloc(msg, struct imessaging_rec);
338         if (rec == NULL) {
339                 smb_panic("Unable to allocate imessaging_rec");
340         }
341
342         talloc_steal(rec, packet.data);
343         rec->msg           = msg;
344         rec->path          = msg->path;
345         rec->header        = (struct imessaging_header *)packet.data;
346         rec->packet        = packet;
347         rec->retries       = 0;
348
349         if (msize != sizeof(*rec->header) + rec->header->length) {
350                 DEBUG(0,("messaging: bad message header size %d should be %d\n", 
351                          rec->header->length, (int)(msize - sizeof(*rec->header))));
352                 talloc_free(rec);
353                 return;
354         }
355
356         imessaging_dispatch(msg, rec);
357         talloc_free(rec);
358 }
359
360
361 /*
362   handle a socket event
363 */
364 static void imessaging_handler(struct tevent_context *ev, struct tevent_fd *fde,
365                               uint16_t flags, void *private_data)
366 {
367         struct imessaging_context *msg = talloc_get_type(private_data,
368                                                         struct imessaging_context);
369         if (flags & TEVENT_FD_WRITE) {
370                 imessaging_send_handler(msg);
371         }
372         if (flags & TEVENT_FD_READ) {
373                 imessaging_recv_handler(msg);
374         }
375 }
376
377
378 /*
379   Register a dispatch function for a particular message type.
380 */
381 NTSTATUS imessaging_register(struct imessaging_context *msg, void *private_data,
382                             uint32_t msg_type, msg_callback_t fn)
383 {
384         struct dispatch_fn *d;
385
386         /* possibly expand dispatch array */
387         if (msg_type >= msg->num_types) {
388                 struct dispatch_fn **dp;
389                 int i;
390                 dp = talloc_realloc(msg, msg->dispatch, struct dispatch_fn *, msg_type+1);
391                 NT_STATUS_HAVE_NO_MEMORY(dp);
392                 msg->dispatch = dp;
393                 for (i=msg->num_types;i<=msg_type;i++) {
394                         msg->dispatch[i] = NULL;
395                 }
396                 msg->num_types = msg_type+1;
397         }
398
399         d = talloc_zero(msg->dispatch, struct dispatch_fn);
400         NT_STATUS_HAVE_NO_MEMORY(d);
401         d->msg_type = msg_type;
402         d->private_data = private_data;
403         d->fn = fn;
404
405         DLIST_ADD(msg->dispatch[msg_type], d);
406
407         return NT_STATUS_OK;
408 }
409
410 /*
411   register a temporary message handler. The msg_type is allocated
412   above MSG_TMP_BASE
413 */
414 NTSTATUS imessaging_register_tmp(struct imessaging_context *msg, void *private_data,
415                                 msg_callback_t fn, uint32_t *msg_type)
416 {
417         struct dispatch_fn *d;
418         int id;
419
420         d = talloc_zero(msg->dispatch, struct dispatch_fn);
421         NT_STATUS_HAVE_NO_MEMORY(d);
422         d->private_data = private_data;
423         d->fn = fn;
424
425         id = idr_get_new_above(msg->dispatch_tree, d, MSG_TMP_BASE, UINT16_MAX);
426         if (id == -1) {
427                 talloc_free(d);
428                 return NT_STATUS_TOO_MANY_CONTEXT_IDS;
429         }
430
431         d->msg_type = (uint32_t)id;
432         (*msg_type) = d->msg_type;
433
434         return NT_STATUS_OK;
435 }
436
437 /*
438   De-register the function for a particular message type.
439 */
440 void imessaging_deregister(struct imessaging_context *msg, uint32_t msg_type, void *private_data)
441 {
442         struct dispatch_fn *d, *next;
443
444         if (msg_type >= msg->num_types) {
445                 d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 
446                                                    msg_type);
447                 if (!d) return;
448                 idr_remove(msg->dispatch_tree, msg_type);
449                 talloc_free(d);
450                 return;
451         }
452
453         for (d = msg->dispatch[msg_type]; d; d = next) {
454                 next = d->next;
455                 if (d->private_data == private_data) {
456                         DLIST_REMOVE(msg->dispatch[msg_type], d);
457                         talloc_free(d);
458                 }
459         }
460 }
461
462 /*
463   Send a message to a particular server
464 */
465 NTSTATUS imessaging_send(struct imessaging_context *msg, struct server_id server,
466                         uint32_t msg_type, const DATA_BLOB *data)
467 {
468         struct imessaging_rec *rec;
469         NTSTATUS status;
470         size_t dlength = data?data->length:0;
471
472         rec = talloc(msg, struct imessaging_rec);
473         if (rec == NULL) {
474                 return NT_STATUS_NO_MEMORY;
475         }
476
477         rec->packet = data_blob_talloc(rec, NULL, sizeof(*rec->header) + dlength);
478         if (rec->packet.data == NULL) {
479                 talloc_free(rec);
480                 return NT_STATUS_NO_MEMORY;
481         }
482
483         rec->retries       = 0;
484         rec->msg              = msg;
485         rec->header           = (struct imessaging_header *)rec->packet.data;
486         /* zero padding */
487         ZERO_STRUCTP(rec->header);
488         rec->header->version  = IMESSAGING_VERSION;
489         rec->header->msg_type = msg_type;
490         rec->header->from     = msg->server_id;
491         rec->header->to       = server;
492         rec->header->length   = dlength;
493         if (dlength != 0) {
494                 memcpy(rec->packet.data + sizeof(*rec->header), 
495                        data->data, dlength);
496         }
497
498         if (!cluster_node_equal(&msg->server_id, &server)) {
499                 /* the destination is on another node - dispatch via
500                    the cluster layer */
501                 status = cluster_message_send(server, &rec->packet);
502                 talloc_free(rec);
503                 return status;
504         }
505
506         rec->path = imessaging_path(msg, server);
507         talloc_steal(rec, rec->path);
508
509         if (msg->pending != NULL) {
510                 status = STATUS_MORE_ENTRIES;
511         } else {
512                 status = try_send(rec);
513         }
514
515         if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
516                 if (msg->pending == NULL) {
517                         TEVENT_FD_WRITEABLE(msg->event.fde);
518                 }
519                 DLIST_ADD_END(msg->pending, rec, struct imessaging_rec *);
520                 return NT_STATUS_OK;
521         }
522
523         talloc_free(rec);
524
525         return status;
526 }
527
528 /*
529   Send a message to a particular server, with the message containing a single pointer
530 */
531 NTSTATUS imessaging_send_ptr(struct imessaging_context *msg, struct server_id server,
532                             uint32_t msg_type, void *ptr)
533 {
534         DATA_BLOB blob;
535
536         blob.data = (uint8_t *)&ptr;
537         blob.length = sizeof(void *);
538
539         return imessaging_send(msg, server, msg_type, &blob);
540 }
541
542
543 /*
544   remove our messaging socket and database entry
545 */
546 int imessaging_cleanup(struct imessaging_context *msg)
547 {
548         DEBUG(5,("imessaging: cleaning up %s\n", msg->path));
549         unlink(msg->path);
550         while (msg->names && msg->names[0]) {
551                 irpc_remove_name(msg, msg->names[0]);
552         }
553         return 0;
554 }
555
556 /*
557   create the listening socket and setup the dispatcher
558
559   use temporary=true when you want a destructor to remove the
560   associated messaging socket and database entry on talloc free. Don't
561   use this in processes that may fork and a child may talloc free this
562   memory
563 */
564 struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
565                                            const char *dir,
566                                            struct server_id server_id,
567                                            struct tevent_context *ev,
568                                            bool auto_remove)
569 {
570         struct imessaging_context *msg;
571         NTSTATUS status;
572         struct socket_address *path;
573
574         if (ev == NULL) {
575                 return NULL;
576         }
577
578         msg = talloc_zero(mem_ctx, struct imessaging_context);
579         if (msg == NULL) {
580                 return NULL;
581         }
582
583         /* setup a handler for messages from other cluster nodes, if appropriate */
584         status = cluster_message_init(msg, server_id, cluster_message_handler);
585         if (!NT_STATUS_IS_OK(status)) {
586                 talloc_free(msg);
587                 return NULL;
588         }
589
590         /* create the messaging directory if needed */
591         mkdir(dir, 0700);
592
593         msg->base_path     = talloc_reference(msg, dir);
594         msg->path          = imessaging_path(msg, server_id);
595         msg->server_id     = server_id;
596         msg->idr           = idr_init(msg);
597         msg->dispatch_tree = idr_init(msg);
598         msg->start_time    = timeval_current();
599
600         status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0);
601         if (!NT_STATUS_IS_OK(status)) {
602                 talloc_free(msg);
603                 return NULL;
604         }
605
606         /* by stealing here we ensure that the socket is cleaned up (and even 
607            deleted) on exit */
608         talloc_steal(msg, msg->sock);
609
610         path = socket_address_from_strings(msg, msg->sock->backend_name, 
611                                            msg->path, 0);
612         if (!path) {
613                 talloc_free(msg);
614                 return NULL;
615         }
616
617         status = socket_listen(msg->sock, path, 50, 0);
618         if (!NT_STATUS_IS_OK(status)) {
619                 DEBUG(0,("Unable to setup messaging listener for '%s':%s\n", msg->path, nt_errstr(status)));
620                 talloc_free(msg);
621                 return NULL;
622         }
623
624         /* it needs to be non blocking for sends */
625         set_blocking(socket_get_fd(msg->sock), false);
626
627         msg->event.ev   = ev;
628         msg->event.fde  = tevent_add_fd(ev, msg, socket_get_fd(msg->sock),
629                                         TEVENT_FD_READ, imessaging_handler, msg);
630         tevent_fd_set_auto_close(msg->event.fde);
631
632         if (auto_remove) {
633                 talloc_set_destructor(msg, imessaging_cleanup);
634         }
635         
636         imessaging_register(msg, NULL, MSG_PING, ping_message);
637         imessaging_register(msg, NULL, MSG_IRPC, irpc_handler);
638         IRPC_REGISTER(msg, irpc, IRPC_UPTIME, irpc_uptime, msg);
639
640         return msg;
641 }
642
643 /* 
644    A hack, for the short term until we get 'client only' messaging in place 
645 */
646 struct imessaging_context *imessaging_client_init(TALLOC_CTX *mem_ctx,
647                                                 const char *dir,
648                                                 struct tevent_context *ev)
649 {
650         struct server_id id;
651         ZERO_STRUCT(id);
652         id.pid = random() % 0x10000000;
653         return imessaging_init(mem_ctx, dir, id, ev, true);
654 }
655 /*
656   a list of registered irpc server functions
657 */
658 struct irpc_list {
659         struct irpc_list *next, *prev;
660         struct GUID uuid;
661         const struct ndr_interface_table *table;
662         int callnum;
663         irpc_function_t fn;
664         void *private_data;
665 };
666
667
668 /*
669   register a irpc server function
670 */
671 NTSTATUS irpc_register(struct imessaging_context *msg_ctx,
672                        const struct ndr_interface_table *table, 
673                        int callnum, irpc_function_t fn, void *private_data)
674 {
675         struct irpc_list *irpc;
676
677         /* override an existing handler, if any */
678         for (irpc=msg_ctx->irpc; irpc; irpc=irpc->next) {
679                 if (irpc->table == table && irpc->callnum == callnum) {
680                         break;
681                 }
682         }
683         if (irpc == NULL) {
684                 irpc = talloc(msg_ctx, struct irpc_list);
685                 NT_STATUS_HAVE_NO_MEMORY(irpc);
686                 DLIST_ADD(msg_ctx->irpc, irpc);
687         }
688
689         irpc->table   = table;
690         irpc->callnum = callnum;
691         irpc->fn      = fn;
692         irpc->private_data = private_data;
693         irpc->uuid = irpc->table->syntax_id.uuid;
694
695         return NT_STATUS_OK;
696 }
697
698
699 /*
700   handle an incoming irpc reply message
701 */
702 static void irpc_handler_reply(struct imessaging_context *msg_ctx, struct irpc_message *m)
703 {
704         struct irpc_request *irpc;
705
706         irpc = (struct irpc_request *)idr_find(msg_ctx->idr, m->header.callid);
707         if (irpc == NULL) return;
708
709         irpc->incoming.handler(irpc, m);
710 }
711
712 /*
713   send a irpc reply
714 */
715 NTSTATUS irpc_send_reply(struct irpc_message *m, NTSTATUS status)
716 {
717         struct ndr_push *push;
718         DATA_BLOB packet;
719         enum ndr_err_code ndr_err;
720
721         m->header.status = status;
722
723         /* setup the reply */
724         push = ndr_push_init_ctx(m->ndr);
725         if (push == NULL) {
726                 status = NT_STATUS_NO_MEMORY;
727                 goto failed;
728         }
729
730         m->header.flags |= IRPC_FLAG_REPLY;
731         m->header.creds.token= NULL;
732
733         /* construct the packet */
734         ndr_err = ndr_push_irpc_header(push, NDR_SCALARS|NDR_BUFFERS, &m->header);
735         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
736                 status = ndr_map_error2ntstatus(ndr_err);
737                 goto failed;
738         }
739
740         ndr_err = m->irpc->table->calls[m->irpc->callnum].ndr_push(push, NDR_OUT, m->data);
741         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
742                 status = ndr_map_error2ntstatus(ndr_err);
743                 goto failed;
744         }
745
746         /* send the reply message */
747         packet = ndr_push_blob(push);
748         status = imessaging_send(m->msg_ctx, m->from, MSG_IRPC, &packet);
749         if (!NT_STATUS_IS_OK(status)) goto failed;
750
751 failed:
752         talloc_free(m);
753         return status;
754 }
755
756 /*
757   handle an incoming irpc request message
758 */
759 static void irpc_handler_request(struct imessaging_context *msg_ctx,
760                                  struct irpc_message *m)
761 {
762         struct irpc_list *i;
763         void *r;
764         enum ndr_err_code ndr_err;
765
766         for (i=msg_ctx->irpc; i; i=i->next) {
767                 if (GUID_equal(&i->uuid, &m->header.uuid) &&
768                     i->table->syntax_id.if_version == m->header.if_version &&
769                     i->callnum == m->header.callnum) {
770                         break;
771                 }
772         }
773
774         if (i == NULL) {
775                 /* no registered handler for this message */
776                 talloc_free(m);
777                 return;
778         }
779
780         /* allocate space for the structure */
781         r = talloc_zero_size(m->ndr, i->table->calls[m->header.callnum].struct_size);
782         if (r == NULL) goto failed;
783
784         m->ndr->flags |= LIBNDR_FLAG_REF_ALLOC;
785
786         /* parse the request data */
787         ndr_err = i->table->calls[i->callnum].ndr_pull(m->ndr, NDR_IN, r);
788         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
789
790         /* make the call */
791         m->private_data= i->private_data;
792         m->defer_reply = false;
793         m->no_reply    = false;
794         m->msg_ctx     = msg_ctx;
795         m->irpc        = i;
796         m->data        = r;
797         m->ev          = msg_ctx->event.ev;
798
799         m->header.status = i->fn(m, r);
800
801         if (m->no_reply) {
802                 /* the server function won't ever be replying to this request */
803                 talloc_free(m);
804                 return;
805         }
806
807         if (m->defer_reply) {
808                 /* the server function has asked to defer the reply to later */
809                 talloc_steal(msg_ctx, m);
810                 return;
811         }
812
813         irpc_send_reply(m, m->header.status);
814         return;
815
816 failed:
817         talloc_free(m);
818 }
819
820 /*
821   handle an incoming irpc message
822 */
823 static void irpc_handler(struct imessaging_context *msg_ctx, void *private_data,
824                          uint32_t msg_type, struct server_id src, DATA_BLOB *packet)
825 {
826         struct irpc_message *m;
827         enum ndr_err_code ndr_err;
828
829         m = talloc(msg_ctx, struct irpc_message);
830         if (m == NULL) goto failed;
831
832         m->from = src;
833
834         m->ndr = ndr_pull_init_blob(packet, m);
835         if (m->ndr == NULL) goto failed;
836
837         m->ndr->flags |= LIBNDR_FLAG_REF_ALLOC;
838
839         ndr_err = ndr_pull_irpc_header(m->ndr, NDR_BUFFERS|NDR_SCALARS, &m->header);
840         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
841
842         if (m->header.flags & IRPC_FLAG_REPLY) {
843                 irpc_handler_reply(msg_ctx, m);
844         } else {
845                 irpc_handler_request(msg_ctx, m);
846         }
847         return;
848
849 failed:
850         talloc_free(m);
851 }
852
853
854 /*
855   destroy a irpc request
856 */
857 static int irpc_destructor(struct irpc_request *irpc)
858 {
859         if (irpc->callid != -1) {
860                 idr_remove(irpc->msg_ctx->idr, irpc->callid);
861                 irpc->callid = -1;
862         }
863
864         return 0;
865 }
866
867 /*
868   open the naming database
869 */
870 static struct tdb_wrap *irpc_namedb_open(struct imessaging_context *msg_ctx)
871 {
872         struct tdb_wrap *t;
873         char *path = talloc_asprintf(msg_ctx, "%s/names.tdb", msg_ctx->base_path);
874         if (path == NULL) {
875                 return NULL;
876         }
877         t = tdb_wrap_open(msg_ctx, path, 0, 0, O_RDWR|O_CREAT, 0660);
878         talloc_free(path);
879         return t;
880 }
881         
882
883 /*
884   add a string name that this irpc server can be called on
885 */
886 NTSTATUS irpc_add_name(struct imessaging_context *msg_ctx, const char *name)
887 {
888         struct tdb_wrap *t;
889         TDB_DATA rec;
890         int count;
891         NTSTATUS status = NT_STATUS_OK;
892
893         t = irpc_namedb_open(msg_ctx);
894         NT_STATUS_HAVE_NO_MEMORY(t);
895
896         if (tdb_lock_bystring(t->tdb, name) != 0) {
897                 talloc_free(t);
898                 return NT_STATUS_LOCK_NOT_GRANTED;
899         }
900         rec = tdb_fetch_bystring(t->tdb, name);
901         count = rec.dsize / sizeof(struct server_id);
902         rec.dptr = (unsigned char *)realloc_p(rec.dptr, struct server_id, count+1);
903         rec.dsize += sizeof(struct server_id);
904         if (rec.dptr == NULL) {
905                 tdb_unlock_bystring(t->tdb, name);
906                 talloc_free(t);
907                 return NT_STATUS_NO_MEMORY;
908         }
909         ((struct server_id *)rec.dptr)[count] = msg_ctx->server_id;
910         if (tdb_store_bystring(t->tdb, name, rec, 0) != 0) {
911                 status = NT_STATUS_INTERNAL_ERROR;
912         }
913         free(rec.dptr);
914         tdb_unlock_bystring(t->tdb, name);
915         talloc_free(t);
916
917         msg_ctx->names = str_list_add(msg_ctx->names, name);
918         talloc_steal(msg_ctx, msg_ctx->names);
919
920         return status;
921 }
922
923 /*
924   return a list of server ids for a server name
925 */
926 struct server_id *irpc_servers_byname(struct imessaging_context *msg_ctx,
927                                       TALLOC_CTX *mem_ctx,
928                                       const char *name)
929 {
930         struct tdb_wrap *t;
931         TDB_DATA rec;
932         int count, i;
933         struct server_id *ret;
934
935         t = irpc_namedb_open(msg_ctx);
936         if (t == NULL) {
937                 return NULL;
938         }
939
940         if (tdb_lock_bystring(t->tdb, name) != 0) {
941                 talloc_free(t);
942                 return NULL;
943         }
944         rec = tdb_fetch_bystring(t->tdb, name);
945         if (rec.dptr == NULL) {
946                 tdb_unlock_bystring(t->tdb, name);
947                 talloc_free(t);
948                 return NULL;
949         }
950         count = rec.dsize / sizeof(struct server_id);
951         ret = talloc_array(mem_ctx, struct server_id, count+1);
952         if (ret == NULL) {
953                 tdb_unlock_bystring(t->tdb, name);
954                 talloc_free(t);
955                 return NULL;
956         }
957         for (i=0;i<count;i++) {
958                 ret[i] = ((struct server_id *)rec.dptr)[i];
959         }
960         ret[i] = cluster_id(0, 0);
961         free(rec.dptr);
962         tdb_unlock_bystring(t->tdb, name);
963         talloc_free(t);
964
965         return ret;
966 }
967
968 /*
969   remove a name from a messaging context
970 */
971 void irpc_remove_name(struct imessaging_context *msg_ctx, const char *name)
972 {
973         struct tdb_wrap *t;
974         TDB_DATA rec;
975         int count, i;
976         struct server_id *ids;
977
978         str_list_remove(msg_ctx->names, name);
979
980         t = irpc_namedb_open(msg_ctx);
981         if (t == NULL) {
982                 return;
983         }
984
985         if (tdb_lock_bystring(t->tdb, name) != 0) {
986                 talloc_free(t);
987                 return;
988         }
989         rec = tdb_fetch_bystring(t->tdb, name);
990         if (rec.dptr == NULL) {
991                 tdb_unlock_bystring(t->tdb, name);
992                 talloc_free(t);
993                 return;
994         }
995         count = rec.dsize / sizeof(struct server_id);
996         if (count == 0) {
997                 free(rec.dptr);
998                 tdb_unlock_bystring(t->tdb, name);
999                 talloc_free(t);
1000                 return;
1001         }
1002         ids = (struct server_id *)rec.dptr;
1003         for (i=0;i<count;i++) {
1004                 if (cluster_id_equal(&ids[i], &msg_ctx->server_id)) {
1005                         if (i < count-1) {
1006                                 memmove(ids+i, ids+i+1, 
1007                                         sizeof(struct server_id) * (count-(i+1)));
1008                         }
1009                         rec.dsize -= sizeof(struct server_id);
1010                         break;
1011                 }
1012         }
1013         tdb_store_bystring(t->tdb, name, rec, 0);
1014         free(rec.dptr);
1015         tdb_unlock_bystring(t->tdb, name);
1016         talloc_free(t);
1017 }
1018
1019 struct server_id imessaging_get_server_id(struct imessaging_context *msg_ctx)
1020 {
1021         return msg_ctx->server_id;
1022 }
1023
1024 struct irpc_bh_state {
1025         struct imessaging_context *msg_ctx;
1026         struct server_id server_id;
1027         const struct ndr_interface_table *table;
1028         uint32_t timeout;
1029         struct security_token *token;
1030 };
1031
1032 static bool irpc_bh_is_connected(struct dcerpc_binding_handle *h)
1033 {
1034         struct irpc_bh_state *hs = dcerpc_binding_handle_data(h,
1035                                    struct irpc_bh_state);
1036
1037         if (!hs->msg_ctx) {
1038                 return false;
1039         }
1040
1041         return true;
1042 }
1043
1044 static uint32_t irpc_bh_set_timeout(struct dcerpc_binding_handle *h,
1045                                     uint32_t timeout)
1046 {
1047         struct irpc_bh_state *hs = dcerpc_binding_handle_data(h,
1048                                    struct irpc_bh_state);
1049         uint32_t old = hs->timeout;
1050
1051         hs->timeout = timeout;
1052
1053         return old;
1054 }
1055
1056 struct irpc_bh_raw_call_state {
1057         struct irpc_request *irpc;
1058         uint32_t opnum;
1059         DATA_BLOB in_data;
1060         DATA_BLOB in_packet;
1061         DATA_BLOB out_data;
1062 };
1063
1064 static void irpc_bh_raw_call_incoming_handler(struct irpc_request *irpc,
1065                                               struct irpc_message *m);
1066
1067 static struct tevent_req *irpc_bh_raw_call_send(TALLOC_CTX *mem_ctx,
1068                                                 struct tevent_context *ev,
1069                                                 struct dcerpc_binding_handle *h,
1070                                                 const struct GUID *object,
1071                                                 uint32_t opnum,
1072                                                 uint32_t in_flags,
1073                                                 const uint8_t *in_data,
1074                                                 size_t in_length)
1075 {
1076         struct irpc_bh_state *hs =
1077                 dcerpc_binding_handle_data(h,
1078                 struct irpc_bh_state);
1079         struct tevent_req *req;
1080         struct irpc_bh_raw_call_state *state;
1081         bool ok;
1082         struct irpc_header header;
1083         struct ndr_push *ndr;
1084         NTSTATUS status;
1085         enum ndr_err_code ndr_err;
1086
1087         req = tevent_req_create(mem_ctx, &state,
1088                                 struct irpc_bh_raw_call_state);
1089         if (req == NULL) {
1090                 return NULL;
1091         }
1092         state->opnum = opnum;
1093         state->in_data.data = discard_const_p(uint8_t, in_data);
1094         state->in_data.length = in_length;
1095
1096         ok = irpc_bh_is_connected(h);
1097         if (!ok) {
1098                 tevent_req_nterror(req, NT_STATUS_INVALID_CONNECTION);
1099                 return tevent_req_post(req, ev);
1100         }
1101
1102         state->irpc = talloc_zero(state, struct irpc_request);
1103         if (tevent_req_nomem(state->irpc, req)) {
1104                 return tevent_req_post(req, ev);
1105         }
1106
1107         state->irpc->msg_ctx  = hs->msg_ctx;
1108         state->irpc->callid   = idr_get_new(hs->msg_ctx->idr,
1109                                             state->irpc, UINT16_MAX);
1110         if (state->irpc->callid == -1) {
1111                 tevent_req_nterror(req, NT_STATUS_INSUFFICIENT_RESOURCES);
1112                 return tevent_req_post(req, ev);
1113         }
1114         state->irpc->incoming.handler = irpc_bh_raw_call_incoming_handler;
1115         state->irpc->incoming.private_data = req;
1116
1117         talloc_set_destructor(state->irpc, irpc_destructor);
1118
1119         /* setup the header */
1120         header.uuid = hs->table->syntax_id.uuid;
1121
1122         header.if_version = hs->table->syntax_id.if_version;
1123         header.callid     = state->irpc->callid;
1124         header.callnum    = state->opnum;
1125         header.flags      = 0;
1126         header.status     = NT_STATUS_OK;
1127         header.creds.token= hs->token;
1128
1129         /* construct the irpc packet */
1130         ndr = ndr_push_init_ctx(state->irpc);
1131         if (tevent_req_nomem(ndr, req)) {
1132                 return tevent_req_post(req, ev);
1133         }
1134
1135         ndr_err = ndr_push_irpc_header(ndr, NDR_SCALARS|NDR_BUFFERS, &header);
1136         status = ndr_map_error2ntstatus(ndr_err);
1137         if (!NT_STATUS_IS_OK(status)) {
1138                 tevent_req_nterror(req, status);
1139                 return tevent_req_post(req, ev);
1140         }
1141
1142         ndr_err = ndr_push_bytes(ndr, in_data, in_length);
1143         status = ndr_map_error2ntstatus(ndr_err);
1144         if (!NT_STATUS_IS_OK(status)) {
1145                 tevent_req_nterror(req, status);
1146                 return tevent_req_post(req, ev);
1147         }
1148
1149         /* and send it */
1150         state->in_packet = ndr_push_blob(ndr);
1151         status = imessaging_send(hs->msg_ctx, hs->server_id,
1152                                 MSG_IRPC, &state->in_packet);
1153         if (!NT_STATUS_IS_OK(status)) {
1154                 tevent_req_nterror(req, status);
1155                 return tevent_req_post(req, ev);
1156         }
1157
1158         if (hs->timeout != IRPC_CALL_TIMEOUT_INF) {
1159                 /* set timeout-callback in case caller wants that */
1160                 ok = tevent_req_set_endtime(req, ev, timeval_current_ofs(hs->timeout, 0));
1161                 if (!ok) {
1162                         return tevent_req_post(req, ev);
1163                 }
1164         }
1165
1166         return req;
1167 }
1168
1169 static void irpc_bh_raw_call_incoming_handler(struct irpc_request *irpc,
1170                                               struct irpc_message *m)
1171 {
1172         struct tevent_req *req =
1173                 talloc_get_type_abort(irpc->incoming.private_data,
1174                 struct tevent_req);
1175         struct irpc_bh_raw_call_state *state =
1176                 tevent_req_data(req,
1177                 struct irpc_bh_raw_call_state);
1178
1179         talloc_steal(state, m);
1180
1181         if (!NT_STATUS_IS_OK(m->header.status)) {
1182                 tevent_req_nterror(req, m->header.status);
1183                 return;
1184         }
1185
1186         state->out_data = data_blob_talloc(state,
1187                 m->ndr->data + m->ndr->offset,
1188                 m->ndr->data_size - m->ndr->offset);
1189         if ((m->ndr->data_size - m->ndr->offset) > 0 && !state->out_data.data) {
1190                 tevent_req_oom(req);
1191                 return;
1192         }
1193
1194         tevent_req_done(req);
1195 }
1196
1197 static NTSTATUS irpc_bh_raw_call_recv(struct tevent_req *req,
1198                                         TALLOC_CTX *mem_ctx,
1199                                         uint8_t **out_data,
1200                                         size_t *out_length,
1201                                         uint32_t *out_flags)
1202 {
1203         struct irpc_bh_raw_call_state *state =
1204                 tevent_req_data(req,
1205                 struct irpc_bh_raw_call_state);
1206         NTSTATUS status;
1207
1208         if (tevent_req_is_nterror(req, &status)) {
1209                 tevent_req_received(req);
1210                 return status;
1211         }
1212
1213         *out_data = talloc_move(mem_ctx, &state->out_data.data);
1214         *out_length = state->out_data.length;
1215         *out_flags = 0;
1216         tevent_req_received(req);
1217         return NT_STATUS_OK;
1218 }
1219
1220 struct irpc_bh_disconnect_state {
1221         uint8_t _dummy;
1222 };
1223
1224 static struct tevent_req *irpc_bh_disconnect_send(TALLOC_CTX *mem_ctx,
1225                                                 struct tevent_context *ev,
1226                                                 struct dcerpc_binding_handle *h)
1227 {
1228         struct irpc_bh_state *hs = dcerpc_binding_handle_data(h,
1229                                      struct irpc_bh_state);
1230         struct tevent_req *req;
1231         struct irpc_bh_disconnect_state *state;
1232         bool ok;
1233
1234         req = tevent_req_create(mem_ctx, &state,
1235                                 struct irpc_bh_disconnect_state);
1236         if (req == NULL) {
1237                 return NULL;
1238         }
1239
1240         ok = irpc_bh_is_connected(h);
1241         if (!ok) {
1242                 tevent_req_nterror(req, NT_STATUS_INVALID_CONNECTION);
1243                 return tevent_req_post(req, ev);
1244         }
1245
1246         hs->msg_ctx = NULL;
1247
1248         tevent_req_done(req);
1249         return tevent_req_post(req, ev);
1250 }
1251
1252 static NTSTATUS irpc_bh_disconnect_recv(struct tevent_req *req)
1253 {
1254         NTSTATUS status;
1255
1256         if (tevent_req_is_nterror(req, &status)) {
1257                 tevent_req_received(req);
1258                 return status;
1259         }
1260
1261         tevent_req_received(req);
1262         return NT_STATUS_OK;
1263 }
1264
1265 static bool irpc_bh_ref_alloc(struct dcerpc_binding_handle *h)
1266 {
1267         return true;
1268 }
1269
1270 static const struct dcerpc_binding_handle_ops irpc_bh_ops = {
1271         .name                   = "wbint",
1272         .is_connected           = irpc_bh_is_connected,
1273         .set_timeout            = irpc_bh_set_timeout,
1274         .raw_call_send          = irpc_bh_raw_call_send,
1275         .raw_call_recv          = irpc_bh_raw_call_recv,
1276         .disconnect_send        = irpc_bh_disconnect_send,
1277         .disconnect_recv        = irpc_bh_disconnect_recv,
1278
1279         .ref_alloc              = irpc_bh_ref_alloc,
1280 };
1281
1282 /* initialise a irpc binding handle */
1283 struct dcerpc_binding_handle *irpc_binding_handle(TALLOC_CTX *mem_ctx,
1284                                         struct imessaging_context *msg_ctx,
1285                                         struct server_id server_id,
1286                                         const struct ndr_interface_table *table)
1287 {
1288         struct dcerpc_binding_handle *h;
1289         struct irpc_bh_state *hs;
1290
1291         h = dcerpc_binding_handle_create(mem_ctx,
1292                                          &irpc_bh_ops,
1293                                          NULL,
1294                                          table,
1295                                          &hs,
1296                                          struct irpc_bh_state,
1297                                          __location__);
1298         if (h == NULL) {
1299                 return NULL;
1300         }
1301         hs->msg_ctx = msg_ctx;
1302         hs->server_id = server_id;
1303         hs->table = table;
1304         hs->timeout = IRPC_CALL_TIMEOUT;
1305
1306         dcerpc_binding_handle_set_sync_ev(h, msg_ctx->event.ev);
1307
1308         return h;
1309 }
1310
1311 struct dcerpc_binding_handle *irpc_binding_handle_by_name(TALLOC_CTX *mem_ctx,
1312                                         struct imessaging_context *msg_ctx,
1313                                         const char *dest_task,
1314                                         const struct ndr_interface_table *table)
1315 {
1316         struct dcerpc_binding_handle *h;
1317         struct server_id *sids;
1318         struct server_id sid;
1319
1320         /* find the server task */
1321         sids = irpc_servers_byname(msg_ctx, mem_ctx, dest_task);
1322         if (sids == NULL) {
1323                 errno = EADDRNOTAVAIL;
1324                 return NULL;
1325         }
1326         if (sids[0].pid == 0) {
1327                 talloc_free(sids);
1328                 errno = EADDRNOTAVAIL;
1329                 return NULL;
1330         }
1331         sid = sids[0];
1332         talloc_free(sids);
1333
1334         h = irpc_binding_handle(mem_ctx, msg_ctx,
1335                                 sid, table);
1336         if (h == NULL) {
1337                 return NULL;
1338         }
1339
1340         return h;
1341 }
1342
1343 void irpc_binding_handle_add_security_token(struct dcerpc_binding_handle *h,
1344                                             struct security_token *token)
1345 {
1346         struct irpc_bh_state *hs =
1347                 dcerpc_binding_handle_data(h,
1348                 struct irpc_bh_state);
1349
1350         hs->token = token;
1351 }