7f90bd4e409385a036d317811464ef7be696e404
[bbaumbach/samba-autobuild/.git] / source4 / lib / messaging / messaging.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    Samba internal messaging functions
5
6    Copyright (C) Andrew Tridgell 2004
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 2 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 #include "includes.h"
24
25 /* change the message version with any incompatible changes in the protocol */
26 #define MESSAGING_VERSION 1
27
28 struct messaging_context {
29         servid_t server_id;
30         struct socket_context *sock;
31         char *path;
32         struct dispatch_fn *dispatch;
33
34         struct {
35                 struct event_context *ev;
36                 struct fd_event *fde;
37         } event;
38 };
39
40 /* we have a linked list of dispatch handlers that this messaging
41    server can deal with */
42 struct dispatch_fn {
43         struct dispatch_fn *next, *prev;
44         uint32_t msg_type;
45         void *private;
46         void (*fn)(struct messaging_context *msg, void *private, 
47                    uint32_t msg_type, servid_t server_id, DATA_BLOB *data);
48 };
49
50 /* an individual message */
51 struct messaging_rec {
52         struct messaging_context *msg;
53         struct socket_context *sock;
54         struct fd_event *fde;
55         const char *path;
56
57         struct {
58                 uint32_t version;
59                 uint32_t msg_type;
60                 servid_t from;
61                 servid_t to;
62                 uint32_t length;
63         } header;
64
65         DATA_BLOB data;
66
67         uint32_t ndone;
68 };
69
70 /*
71  A useful function for testing the message system.
72 */
73 static void ping_message(struct messaging_context *msg, void *private, 
74                          uint32_t msg_type, servid_t src, DATA_BLOB *data)
75 {
76         DEBUG(1,("INFO: Received PING message from server %u [%.*s]\n",
77                  (uint_t)src, data->length, data->data?(const char *)data->data:""));
78         messaging_send(msg, src, MSG_PONG, data);
79 }
80
81 /* 
82    return the path to a messaging socket
83 */
84 static char *messaging_path(TALLOC_CTX *mem_ctx, servid_t server_id)
85 {
86         char *name = talloc_asprintf(mem_ctx, "messaging/msg.%u", (unsigned)server_id);
87         char *ret;
88         ret = lock_path(mem_ctx, name);
89         talloc_free(name);
90         return ret;
91 }
92
93 /*
94   dispatch a fully received message
95 */
96 static void messaging_dispatch(struct messaging_context *msg, struct messaging_rec *rec)
97 {
98         struct dispatch_fn *d, *next;
99         for (d=msg->dispatch;d;d=next) {
100                 next = d->next;
101                 if (d->msg_type == rec->header.msg_type) {
102                         d->fn(msg, d->private, d->msg_type, rec->header.from, &rec->data);
103                 }
104         }
105
106         /* we don't free the record itself here as there may
107            be more messages from this client */
108         data_blob_free(&rec->data);
109         rec->header.length = 0;
110         rec->ndone = 0;
111 }
112
113
114 /*
115   handle IO for a single message
116 */
117 static void messaging_recv_handler(struct event_context *ev, struct fd_event *fde, 
118                                  time_t t, uint16_t flags)
119 {
120         struct messaging_rec *rec = fde->private;
121         struct messaging_context *msg = rec->msg;
122         NTSTATUS status;
123
124         if (rec->ndone < sizeof(rec->header)) {
125                 /* receive the header */
126                 size_t nread;
127
128                 status = socket_recv(rec->sock, 
129                                      rec->ndone + (char *)&rec->header,
130                                      sizeof(rec->header) - rec->ndone, &nread, 0);
131                 if (NT_STATUS_IS_ERR(status)) {
132                         talloc_free(rec);
133                         return;
134                 }
135
136                 if (nread == 0) {
137                         return;
138                 }
139
140                 rec->ndone += nread;
141
142                 if (rec->ndone == sizeof(rec->header)) {
143                         if (rec->header.version != MESSAGING_VERSION) {
144                                 DEBUG(0,("meessage with wrong version %u\n",
145                                          rec->header.version));
146                                 talloc_free(rec);
147                         }
148                         rec->data = data_blob_talloc(rec, NULL, rec->header.length);
149                         if (rec->data.length != rec->header.length) {
150                                 DEBUG(0,("Unable to allocate message of size %u\n",
151                                          rec->header.length));
152                                 talloc_free(rec);
153                         }
154                 }
155         }
156
157         if (rec->ndone >= sizeof(rec->header) && 
158             rec->ndone < sizeof(rec->header) + rec->header.length) {
159                 /* receive the body, if any */
160                 size_t nread;
161
162                 status = socket_recv(rec->sock, 
163                                      rec->data.data + (rec->ndone - sizeof(rec->header)),
164                                      sizeof(rec->header) + rec->header.length - rec->ndone, 
165                                      &nread, 0);
166                 if (NT_STATUS_IS_ERR(status)) {
167                         talloc_free(rec);
168                         return;
169                 }
170
171                 if (nread == 0) {
172                         return;
173                 }
174
175                 rec->ndone += nread;
176         }
177
178         if (rec->ndone == sizeof(rec->header) + rec->header.length) {
179                 /* we've got the whole message */
180                 messaging_dispatch(msg, rec);
181         }
182 }
183
184 /*
185   destroy a messaging record
186 */
187 static int rec_destructor(void *ptr)
188 {
189         struct messaging_rec *rec = ptr;
190         struct messaging_context *msg = rec->msg;
191         event_remove_fd(msg->event.ev, rec->fde);
192         return 0;
193 }
194
195 /*
196   handle a new incoming connection
197 */
198 static void messaging_listen_handler(struct event_context *ev, struct fd_event *fde, 
199                                      time_t t, uint16_t flags)
200 {
201         struct messaging_context *msg = fde->private;
202         struct messaging_rec *rec;
203         NTSTATUS status;
204         struct fd_event fde2;
205
206         rec = talloc_p(msg, struct messaging_rec);
207         if (rec == NULL) {
208                 smb_panic("Unable to allocate messaging_rec");
209         }
210
211         status = socket_accept(msg->sock, &rec->sock);
212         if (!NT_STATUS_IS_OK(status)) {
213                 smb_panic("Unable to accept messaging_rec");
214         }
215         talloc_steal(rec, rec->sock);
216
217         rec->msg = msg;
218         rec->ndone = 0;
219         rec->header.length = 0;
220         rec->path = msg->path;
221
222         fde2.private    = rec;
223         fde2.fd         = socket_get_fd(rec->sock);
224         fde2.flags      = EVENT_FD_READ;
225         fde2.handler    = messaging_recv_handler;
226
227         rec->fde        = event_add_fd(msg->event.ev, &fde2);
228
229         talloc_set_destructor(rec, rec_destructor);
230 }
231
232 /*
233   Register a dispatch function for a particular message type.
234 */
235 void messaging_register(struct messaging_context *msg, void *private,
236                         uint32_t msg_type, 
237                         void (*fn)(struct messaging_context *, void *, uint32_t, servid_t, DATA_BLOB *))
238 {
239         struct dispatch_fn *d;
240
241         d = talloc_p(msg, struct dispatch_fn);
242         d->msg_type = msg_type;
243         d->private = private;
244         d->fn = fn;
245         DLIST_ADD(msg->dispatch, d);
246 }
247
248 /*
249   De-register the function for a particular message type.
250 */
251 void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void *private)
252 {
253         struct dispatch_fn *d, *next;
254
255         for (d = msg->dispatch; d; d = next) {
256                 next = d->next;
257                 if (d->msg_type == msg_type && 
258                     d->private == private) {
259                         DLIST_REMOVE(msg->dispatch, d);
260                         talloc_free(d);
261                 }
262         }       
263 }
264
265
266
267 /*
268   handle IO for sending a message
269 */
270 static void messaging_send_handler(struct event_context *ev, struct fd_event *fde, 
271                                    time_t t, uint16_t flags)
272 {
273         struct messaging_rec *rec = fde->private;
274         NTSTATUS status;
275
276         if (rec->ndone < sizeof(rec->header)) {
277                 /* send the header */
278                 size_t nsent;
279                 DATA_BLOB blob;
280
281                 blob.data = rec->ndone + (char *)&rec->header;
282                 blob.length = sizeof(rec->header) - rec->ndone;
283
284                 status = socket_send(rec->sock, &blob, &nsent, 0);
285                 if (NT_STATUS_IS_ERR(status)) {
286                         talloc_free(rec);
287                         return;
288                 }
289
290                 if (nsent == 0) {
291                         return;
292                 }
293
294                 rec->ndone += nsent;
295         }
296
297         if (rec->ndone >= sizeof(rec->header) && 
298             rec->ndone < sizeof(rec->header) + rec->header.length) {
299                 /* send the body, if any */
300                 DATA_BLOB blob;
301                 size_t nsent;
302
303                 blob.data = rec->data.data + (rec->ndone - sizeof(rec->header));
304                 blob.length = rec->header.length - (rec->ndone - sizeof(rec->header));
305
306                 status = socket_send(rec->sock, &blob, &nsent, 0);
307                 if (NT_STATUS_IS_ERR(status)) {
308                         talloc_free(rec);
309                         return;
310                 }
311
312                 rec->ndone += nsent;
313         }
314
315         if (rec->ndone == sizeof(rec->header) + rec->header.length) {
316                 /* we've done the whole message */
317                 talloc_free(rec);
318         }
319 }
320
321
322 /*
323   when the servers listen queue is full we use this to backoff the message
324 */
325 static void messaging_backoff_handler(struct event_context *ev, struct timed_event *te, time_t t)
326 {
327         struct messaging_rec *rec = te->private;
328         struct messaging_context *msg = rec->msg;
329         NTSTATUS status;
330         struct fd_event fde;
331
332         status = socket_connect(rec->sock, NULL, 0, rec->path, 0, 0);
333         if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
334                 /* backoff again */
335                 te->next_event = t+1;
336                 return;
337         }
338
339         if (!NT_STATUS_IS_OK(status)) {
340                 DEBUG(1,("messaging: Lost message from %u to %u of type %u after backoff - %s\n", 
341                          rec->header.from, rec->header.to, rec->header.msg_type, nt_errstr(status)));
342                 talloc_free(rec);
343                 return;
344         }
345
346         fde.private     = rec;
347         fde.fd          = socket_get_fd(rec->sock);
348         fde.flags       = EVENT_FD_WRITE;
349         fde.handler     = messaging_send_handler;
350
351         rec->fde        = event_add_fd(msg->event.ev, &fde);
352
353         talloc_set_destructor(rec, rec_destructor);
354
355         messaging_send_handler(msg->event.ev, rec->fde, 0, EVENT_FD_WRITE);
356 }
357
358
359 /*
360   Send a message to a particular server
361 */
362 NTSTATUS messaging_send(struct messaging_context *msg, servid_t server, uint32_t msg_type, DATA_BLOB *data)
363 {
364         struct messaging_rec *rec;
365         NTSTATUS status;
366         struct fd_event fde;
367
368         rec = talloc_p(msg, struct messaging_rec);
369         if (rec == NULL) {
370                 return NT_STATUS_NO_MEMORY;
371         }
372
373         rec->msg = msg;
374         rec->header.version = MESSAGING_VERSION;
375         rec->header.msg_type = msg_type;
376         rec->header.from = msg->server_id;
377         rec->header.to = server;
378         rec->header.length = data?data->length:0;
379         if (rec->header.length != 0) {
380                 rec->data = data_blob_talloc(rec, data->data, data->length);
381         } else {
382                 rec->data = data_blob(NULL, 0);
383         }
384         rec->ndone = 0;
385
386         status = socket_create("unix", SOCKET_TYPE_STREAM, &rec->sock, 0);
387         if (!NT_STATUS_IS_OK(status)) {
388                 talloc_free(rec);
389                 return status;
390         }
391         talloc_steal(rec, rec->sock);
392
393         rec->path = messaging_path(rec, server);
394
395         status = socket_connect(rec->sock, NULL, 0, rec->path, 0, 0);
396         if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
397                 /* backoff on this message - the servers listen queue is full */
398                 struct timed_event te;
399                 te.next_event = time(NULL)+1;
400                 te.handler = messaging_backoff_handler;
401                 te.private = rec;
402                 event_add_timed(msg->event.ev, &te);
403                 return NT_STATUS_OK;
404         }
405
406         if (!NT_STATUS_IS_OK(status)) {
407                 talloc_free(rec);
408                 return status;
409         }
410
411         fde.private     = rec;
412         fde.fd          = socket_get_fd(rec->sock);
413         fde.flags       = EVENT_FD_WRITE;
414         fde.handler     = messaging_send_handler;
415
416         rec->fde        = event_add_fd(msg->event.ev, &fde);
417
418         talloc_set_destructor(rec, rec_destructor);
419
420         messaging_send_handler(msg->event.ev, rec->fde, 0, EVENT_FD_WRITE);
421
422         return NT_STATUS_OK;
423 }
424
425
426 /*
427   destroy the messaging context
428 */
429 static int messaging_destructor(void *ptr)
430 {
431         struct messaging_context *msg = ptr;
432         event_remove_fd(msg->event.ev, msg->event.fde);
433         unlink(msg->path);
434         return 0;
435 }
436
437 /*
438   create the listening socket and setup the dispatcher
439 */
440 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, servid_t server_id, struct event_context *ev)
441 {
442         struct messaging_context *msg;
443         NTSTATUS status;
444         struct fd_event fde;
445
446         msg = talloc_p(mem_ctx, struct messaging_context);
447         if (msg == NULL) {
448                 return NULL;
449         }
450
451         /* create the messaging directory if needed */
452         msg->path = lock_path(msg, "messaging");
453         mkdir(msg->path, 0700);
454         talloc_free(msg->path);
455
456         msg->server_id = server_id;
457         msg->dispatch = NULL;
458         msg->path = messaging_path(msg, server_id);
459
460         status = socket_create("unix", SOCKET_TYPE_STREAM, &msg->sock, 0);
461         if (!NT_STATUS_IS_OK(status)) {
462                 talloc_free(msg);
463                 return NULL;
464         }
465
466         /* by stealing here we ensure that the socket is cleaned up (and even 
467            deleted) on exit */
468         talloc_steal(msg, msg->sock);
469
470         status = socket_listen(msg->sock, msg->path, 0, 50, 0);
471         if (!NT_STATUS_IS_OK(status)) {
472                 DEBUG(0,("Unable to setup messaging listener for '%s'\n", msg->path));
473                 talloc_free(msg);
474                 return NULL;
475         }
476
477         fde.private     = msg;
478         fde.fd          = socket_get_fd(msg->sock);
479         fde.flags       = EVENT_FD_READ;
480         fde.handler     = messaging_listen_handler;
481
482         msg->event.ev   = ev;
483         msg->event.fde  = event_add_fd(ev, &fde);
484
485         talloc_set_destructor(msg, messaging_destructor);
486         
487         messaging_register(msg, NULL, MSG_PING, ping_message);
488
489         return msg;
490 }
491
492