2 Unix SMB/CIFS implementation.
4 Samba internal messaging functions
6 Copyright (C) Andrew Tridgell 2004
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 2 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 #include "lib/events/events.h"
25 #include "system/dir.h"
26 #include "system/filesys.h"
27 #include "system/time.h"
29 #include "dlinklist.h"
30 #include "lib/socket/socket.h"
31 #include "librpc/gen_ndr/ndr_irpc.h"
32 #include "lib/messaging/irpc.h"
34 #include "lib/tdb/include/tdb.h"
35 #include "lib/tdb/include/tdbutil.h"
37 /* change the message version with any incompatible changes in the protocol */
38 #define MESSAGING_VERSION 1
40 struct messaging_context {
42 struct socket_context *sock;
43 const char *base_path;
45 struct dispatch_fn *dispatch;
46 struct messaging_rec *pending;
47 struct irpc_list *irpc;
48 struct idr_context *idr;
52 struct event_context *ev;
57 /* we have a linked list of dispatch handlers that this messaging
58 server can deal with */
60 struct dispatch_fn *next, *prev;
63 void (*fn)(struct messaging_context *msg, void *private,
64 uint32_t msg_type, uint32_t server_id, DATA_BLOB *data);
67 /* an individual message */
68 struct messaging_rec {
69 struct messaging_rec *next, *prev;
70 struct messaging_context *msg;
73 struct messaging_header {
85 static void irpc_handler(struct messaging_context *, void *,
86 uint32_t, uint32_t, DATA_BLOB *);
90 A useful function for testing the message system.
92 static void ping_message(struct messaging_context *msg, void *private,
93 uint32_t msg_type, uint32_t src, DATA_BLOB *data)
95 DEBUG(1,("INFO: Received PING message from server %u [%.*s]\n",
96 (uint_t)src, (int)data->length,
97 data->data?(const char *)data->data:""));
98 messaging_send(msg, src, MSG_PONG, data);
102 return the path to a messaging socket
104 static char *messaging_path(struct messaging_context *msg, uint32_t server_id)
106 return talloc_asprintf(msg, "%s/msg.%u", msg->base_path, (unsigned)server_id);
110 dispatch a fully received message
112 static void messaging_dispatch(struct messaging_context *msg, struct messaging_rec *rec)
114 struct dispatch_fn *d, *next;
115 for (d=msg->dispatch;d;d=next) {
117 if (d->msg_type == rec->header->msg_type) {
119 data.data = rec->packet.data + sizeof(*rec->header);
120 data.length = rec->header->length;
121 d->fn(msg, d->private, d->msg_type, rec->header->from, &data);
124 rec->header->length = 0;
129 try to send the message
131 static NTSTATUS try_send(struct messaging_rec *rec)
133 struct messaging_context *msg = rec->msg;
138 /* we send with privileges so messages work from any context */
139 priv = root_privileges();
140 status = socket_sendto(msg->sock, &rec->packet, &nsent, 0, rec->path, 0);
147 handle a socket write event
149 static void messaging_send_handler(struct messaging_context *msg)
151 while (msg->pending) {
152 struct messaging_rec *rec = msg->pending;
154 status = try_send(rec);
155 if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
158 if (!NT_STATUS_IS_OK(status)) {
159 DEBUG(1,("messaging: Lost message from %u to %u of type %u - %s\n",
160 rec->header->from, rec->header->to, rec->header->msg_type,
163 DLIST_REMOVE(msg->pending, rec);
166 if (msg->pending == NULL) {
167 EVENT_FD_NOT_WRITEABLE(msg->event.fde);
172 handle a new incoming packet
174 static void messaging_recv_handler(struct messaging_context *msg)
176 struct messaging_rec *rec;
181 /* see how many bytes are in the next packet */
182 status = socket_pending(msg->sock, &msize);
183 if (!NT_STATUS_IS_OK(status)) {
184 DEBUG(0,("socket_pending failed in messaging - %s\n",
189 packet = data_blob_talloc(msg, NULL, msize);
190 if (packet.data == NULL) {
191 /* assume this is temporary and retry */
195 status = socket_recv(msg->sock, packet.data, msize, &msize, 0);
196 if (!NT_STATUS_IS_OK(status)) {
197 data_blob_free(&packet);
201 if (msize < sizeof(*rec->header)) {
202 DEBUG(0,("messaging: bad message of size %d\n", (int)msize));
203 data_blob_free(&packet);
207 rec = talloc(msg, struct messaging_rec);
209 smb_panic("Unable to allocate messaging_rec");
212 talloc_steal(rec, packet.data);
214 rec->path = msg->path;
215 rec->header = (struct messaging_header *)packet.data;
216 rec->packet = packet;
218 if (msize != sizeof(*rec->header) + rec->header->length) {
219 DEBUG(0,("messaging: bad message header size %d should be %d\n",
220 rec->header->length, (int)(msize - sizeof(*rec->header))));
225 messaging_dispatch(msg, rec);
231 handle a socket event
233 static void messaging_handler(struct event_context *ev, struct fd_event *fde,
234 uint16_t flags, void *private)
236 struct messaging_context *msg = talloc_get_type(private,
237 struct messaging_context);
238 if (flags & EVENT_FD_WRITE) {
239 messaging_send_handler(msg);
241 if (flags & EVENT_FD_READ) {
242 messaging_recv_handler(msg);
248 Register a dispatch function for a particular message type.
250 void messaging_register(struct messaging_context *msg, void *private,
252 void (*fn)(struct messaging_context *, void *, uint32_t, uint32_t, DATA_BLOB *))
254 struct dispatch_fn *d;
256 d = talloc(msg, struct dispatch_fn);
257 d->msg_type = msg_type;
258 d->private = private;
260 DLIST_ADD(msg->dispatch, d);
264 De-register the function for a particular message type.
266 void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void *private)
268 struct dispatch_fn *d, *next;
270 for (d = msg->dispatch; d; d = next) {
272 if (d->msg_type == msg_type &&
273 d->private == private) {
274 DLIST_REMOVE(msg->dispatch, d);
282 Send a message to a particular server
284 NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server,
285 uint32_t msg_type, DATA_BLOB *data)
287 struct messaging_rec *rec;
289 size_t dlength = data?data->length:0;
291 rec = talloc(msg, struct messaging_rec);
293 return NT_STATUS_NO_MEMORY;
296 rec->packet = data_blob_talloc(rec, NULL, sizeof(*rec->header) + dlength);
297 if (rec->packet.data == NULL) {
299 return NT_STATUS_NO_MEMORY;
303 rec->header = (struct messaging_header *)rec->packet.data;
304 rec->header->version = MESSAGING_VERSION;
305 rec->header->msg_type = msg_type;
306 rec->header->from = msg->server_id;
307 rec->header->to = server;
308 rec->header->length = dlength;
310 memcpy(rec->packet.data + sizeof(*rec->header),
311 data->data, dlength);
314 rec->path = messaging_path(msg, server);
315 talloc_steal(rec, rec->path);
317 if (msg->pending != NULL) {
318 status = STATUS_MORE_ENTRIES;
320 status = try_send(rec);
323 if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
324 if (msg->pending == NULL) {
325 EVENT_FD_WRITEABLE(msg->event.fde);
327 DLIST_ADD_END(msg->pending, rec, struct messaging_rec *);
337 Send a message to a particular server, with the message containing a single pointer
339 NTSTATUS messaging_send_ptr(struct messaging_context *msg, uint32_t server,
340 uint32_t msg_type, void *ptr)
344 blob.data = (void *)&ptr;
345 blob.length = sizeof(void *);
347 return messaging_send(msg, server, msg_type, &blob);
352 destroy the messaging context
354 static int messaging_destructor(void *ptr)
356 struct messaging_context *msg = ptr;
358 while (msg->names && msg->names[0]) {
359 irpc_remove_name(msg, msg->names[0]);
365 create the listening socket and setup the dispatcher
367 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id,
368 struct event_context *ev)
370 struct messaging_context *msg;
374 msg = talloc(mem_ctx, struct messaging_context);
380 ev = event_context_init(msg);
383 /* create the messaging directory if needed */
384 path = smbd_tmp_path(msg, "messaging");
388 msg->base_path = smbd_tmp_path(msg, "messaging");
389 msg->path = messaging_path(msg, server_id);
390 msg->server_id = server_id;
391 msg->dispatch = NULL;
393 msg->idr = idr_init(msg);
397 status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0);
398 if (!NT_STATUS_IS_OK(status)) {
403 /* by stealing here we ensure that the socket is cleaned up (and even
405 talloc_steal(msg, msg->sock);
407 status = socket_listen(msg->sock, msg->path, 0, 50, 0);
408 if (!NT_STATUS_IS_OK(status)) {
409 DEBUG(0,("Unable to setup messaging listener for '%s':%s\n", msg->path, nt_errstr(status)));
414 /* it needs to be non blocking for sends */
415 set_blocking(socket_get_fd(msg->sock), False);
417 msg->event.ev = talloc_reference(msg, ev);
418 msg->event.fde = event_add_fd(ev, msg, socket_get_fd(msg->sock),
419 EVENT_FD_READ, messaging_handler, msg);
421 talloc_set_destructor(msg, messaging_destructor);
423 messaging_register(msg, NULL, MSG_PING, ping_message);
424 messaging_register(msg, NULL, MSG_IRPC, irpc_handler);
431 a list of registered irpc server functions
434 struct irpc_list *next, *prev;
436 const struct dcerpc_interface_table *table;
444 register a irpc server function
446 NTSTATUS irpc_register(struct messaging_context *msg_ctx,
447 const struct dcerpc_interface_table *table,
448 int callnum, irpc_function_t fn, void *private)
450 struct irpc_list *irpc;
452 /* override an existing handler, if any */
453 for (irpc=msg_ctx->irpc; irpc; irpc=irpc->next) {
454 if (irpc->table == table && irpc->callnum == callnum) {
459 irpc = talloc(msg_ctx, struct irpc_list);
460 NT_STATUS_HAVE_NO_MEMORY(irpc);
461 DLIST_ADD(msg_ctx->irpc, irpc);
465 irpc->callnum = callnum;
467 irpc->private = private;
468 GUID_from_string(irpc->table->uuid, &irpc->uuid);
475 handle an incoming irpc reply message
477 static void irpc_handler_reply(struct messaging_context *msg_ctx,
478 struct ndr_pull *ndr, struct irpc_header *header)
480 struct irpc_request *irpc;
482 irpc = idr_find(msg_ctx->idr, header->callid);
483 if (irpc == NULL) return;
485 /* parse the reply data */
486 irpc->status = irpc->table->calls[irpc->callnum].ndr_pull(ndr, NDR_OUT, irpc->r);
487 if (NT_STATUS_IS_OK(irpc->status)) {
488 irpc->status = header->status;
491 talloc_steal(irpc, ndr);
492 if (irpc->async.fn) {
493 irpc->async.fn(irpc);
499 handle an incoming irpc request message
501 static void irpc_handler_request(struct messaging_context *msg_ctx,
502 struct ndr_pull *ndr, struct irpc_header *header,
508 struct irpc_message m;
509 struct ndr_push *push;
512 for (i=msg_ctx->irpc; i; i=i->next) {
513 if (GUID_equal(&i->uuid, &header->uuid) &&
514 i->table->if_version == header->if_version &&
515 i->callnum == header->callnum) {
521 /* no registered handler for this message */
525 /* allocate space for the structure */
526 r = talloc_zero_size(ndr, i->table->calls[header->callnum].struct_size);
527 if (r == NULL) goto failed;
529 /* parse the request data */
530 status = i->table->calls[i->callnum].ndr_pull(ndr, NDR_IN, r);
531 if (!NT_STATUS_IS_OK(status)) goto failed;
535 m.private = i->private;
536 header->status = i->fn(&m, r);
538 /* setup the reply */
539 push = ndr_push_init_ctx(ndr);
540 if (push == NULL) goto failed;
542 header->flags |= IRPC_FLAG_REPLY;
544 /* construct the packet */
545 status = ndr_push_irpc_header(push, NDR_SCALARS|NDR_BUFFERS, header);
546 if (!NT_STATUS_IS_OK(status)) goto failed;
548 status = i->table->calls[i->callnum].ndr_push(push, NDR_OUT, r);
549 if (!NT_STATUS_IS_OK(status)) goto failed;
551 /* send the reply message */
552 packet = ndr_push_blob(push);
553 status = messaging_send(msg_ctx, src, MSG_IRPC, &packet);
554 if (!NT_STATUS_IS_OK(status)) goto failed;
557 /* nothing to clean up */
562 handle an incoming irpc message
564 static void irpc_handler(struct messaging_context *msg_ctx, void *private,
565 uint32_t msg_type, uint32_t src, DATA_BLOB *packet)
567 struct irpc_header header;
568 struct ndr_pull *ndr;
571 ndr = ndr_pull_init_blob(packet, msg_ctx);
572 if (ndr == NULL) goto failed;
574 status = ndr_pull_irpc_header(ndr, NDR_BUFFERS|NDR_SCALARS, &header);
575 if (!NT_STATUS_IS_OK(status)) goto failed;
577 if (header.flags & IRPC_FLAG_REPLY) {
578 irpc_handler_reply(msg_ctx, ndr, &header);
580 irpc_handler_request(msg_ctx, ndr, &header, src);
591 destroy a irpc request
593 static int irpc_destructor(void *ptr)
595 struct irpc_request *irpc = talloc_get_type(ptr, struct irpc_request);
596 idr_remove(irpc->msg_ctx->idr, irpc->callid);
601 timeout a irpc request
603 static void irpc_timeout(struct event_context *ev, struct timed_event *te,
604 struct timeval t, void *private)
606 struct irpc_request *irpc = talloc_get_type(private, struct irpc_request);
607 irpc->status = NT_STATUS_IO_TIMEOUT;
609 if (irpc->async.fn) {
610 irpc->async.fn(irpc);
616 make a irpc call - async send
618 struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx,
620 const struct dcerpc_interface_table *table,
621 int callnum, void *r)
623 struct irpc_header header;
624 struct ndr_push *ndr;
627 struct irpc_request *irpc;
629 irpc = talloc(msg_ctx, struct irpc_request);
630 if (irpc == NULL) goto failed;
632 irpc->msg_ctx = msg_ctx;
634 irpc->callnum = callnum;
635 irpc->callid = idr_get_new(msg_ctx->idr, irpc, UINT16_MAX);
636 if (irpc->callid == -1) goto failed;
639 irpc->async.fn = NULL;
641 talloc_set_destructor(irpc, irpc_destructor);
643 /* setup the header */
644 status = GUID_from_string(table->uuid, &header.uuid);
645 if (!NT_STATUS_IS_OK(status)) goto failed;
647 header.if_version = table->if_version;
648 header.callid = irpc->callid;
649 header.callnum = callnum;
651 header.status = NT_STATUS_OK;
653 /* construct the irpc packet */
654 ndr = ndr_push_init_ctx(irpc);
655 if (ndr == NULL) goto failed;
657 status = ndr_push_irpc_header(ndr, NDR_SCALARS|NDR_BUFFERS, &header);
658 if (!NT_STATUS_IS_OK(status)) goto failed;
660 status = table->calls[callnum].ndr_push(ndr, NDR_IN, r);
661 if (!NT_STATUS_IS_OK(status)) goto failed;
664 packet = ndr_push_blob(ndr);
665 status = messaging_send(msg_ctx, server_id, MSG_IRPC, &packet);
666 if (!NT_STATUS_IS_OK(status)) goto failed;
668 event_add_timed(msg_ctx->event.ev, irpc,
669 timeval_current_ofs(IRPC_CALL_TIMEOUT, 0),
681 wait for a irpc reply
683 NTSTATUS irpc_call_recv(struct irpc_request *irpc)
685 NT_STATUS_HAVE_NO_MEMORY(irpc);
686 while (!irpc->done) {
687 if (event_loop_once(irpc->msg_ctx->event.ev) != 0) {
688 return NT_STATUS_CONNECTION_DISCONNECTED;
695 perform a synchronous irpc request
697 NTSTATUS irpc_call(struct messaging_context *msg_ctx,
699 const struct dcerpc_interface_table *table,
700 int callnum, void *r)
702 struct irpc_request *irpc = irpc_call_send(msg_ctx, server_id,
704 return irpc_call_recv(irpc);
708 open the naming database
710 static struct tdb_wrap *irpc_namedb_open(struct messaging_context *msg_ctx)
713 char *path = talloc_asprintf(msg_ctx, "%s/names.tdb", msg_ctx->base_path);
717 t = tdb_wrap_open(msg_ctx, path, 0, 0, O_RDWR|O_CREAT, 0660);
724 add a string name that this irpc server can be called on
726 NTSTATUS irpc_add_name(struct messaging_context *msg_ctx, const char *name)
731 NTSTATUS status = NT_STATUS_OK;
733 t = irpc_namedb_open(msg_ctx);
734 NT_STATUS_HAVE_NO_MEMORY(t);
736 if (tdb_lock_bystring(t->tdb, name) != 0) {
738 return NT_STATUS_LOCK_NOT_GRANTED;
740 rec = tdb_fetch_bystring(t->tdb, name);
741 count = rec.dsize / sizeof(uint32_t);
742 rec.dptr = (char *)realloc_p(rec.dptr, uint32_t, count+1);
743 rec.dsize += sizeof(uint32_t);
744 if (rec.dptr == NULL) {
745 tdb_unlock_bystring(t->tdb, name);
747 return NT_STATUS_NO_MEMORY;
749 ((uint32_t *)rec.dptr)[count] = msg_ctx->server_id;
750 if (tdb_store_bystring(t->tdb, name, rec, 0) != 0) {
751 status = NT_STATUS_INTERNAL_ERROR;
754 tdb_unlock_bystring(t->tdb, name);
757 msg_ctx->names = str_list_add(msg_ctx->names, name);
758 talloc_steal(msg_ctx, msg_ctx->names);
764 return a list of server ids for a server name
766 uint32_t *irpc_servers_byname(struct messaging_context *msg_ctx, const char *name)
773 t = irpc_namedb_open(msg_ctx);
778 if (tdb_lock_bystring(t->tdb, name) != 0) {
782 rec = tdb_fetch_bystring(t->tdb, name);
783 if (rec.dptr == NULL) {
784 tdb_unlock_bystring(t->tdb, name);
788 count = rec.dsize / sizeof(uint32_t);
789 ret = talloc_array(msg_ctx, uint32_t, count+1);
791 tdb_unlock_bystring(t->tdb, name);
795 for (i=0;i<count;i++) {
796 ret[i] = ((uint32_t *)rec.dptr)[i];
800 tdb_unlock_bystring(t->tdb, name);
807 remove a name from a messaging context
809 void irpc_remove_name(struct messaging_context *msg_ctx, const char *name)
816 str_list_remove(msg_ctx->names, name);
818 t = irpc_namedb_open(msg_ctx);
823 if (tdb_lock_bystring(t->tdb, name) != 0) {
827 rec = tdb_fetch_bystring(t->tdb, name);
828 count = rec.dsize / sizeof(uint32_t);
830 tdb_unlock_bystring(t->tdb, name);
834 ids = (uint32_t *)rec.dptr;
835 for (i=0;i<count;i++) {
836 if (ids[i] == msg_ctx->server_id) {
838 memmove(ids+i, ids+i+1, count-(i+1));
840 rec.dsize -= sizeof(uint32_t);
844 tdb_store_bystring(t->tdb, name, rec, 0);
846 tdb_unlock_bystring(t->tdb, name);