r23112: Trim down the message.c API slightly: The messages_pending_for_pid is now
[samba.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 2 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23
24 /**
25   @defgroup messages Internal messaging framework
26   @{
27   @file messages.c
28   
29   @brief  Module for internal messaging between Samba daemons. 
30
31    The idea is that if a part of Samba wants to do communication with
32    another Samba process then it will do a message_register() of a
33    dispatch function, and use message_send_pid() to send messages to
34    that process.
35
36    The dispatch function is given the pid of the sender, and it can
37    use that to reply by message_send_pid().  See ping_message() for a
38    simple example.
39
40    @caution Dispatch functions must be able to cope with incoming
41    messages on an *odd* byte boundary.
42
43    This system doesn't have any inherent size limitations but is not
44    very efficient for large messages or when messages are sent in very
45    quick succession.
46
47 */
48
49 #include "includes.h"
50 #include "librpc/gen_ndr/messaging.h"
51 #include "librpc/gen_ndr/ndr_messaging.h"
52
53 /* the locking database handle */
54 static int received_signal;
55
56 /* change the message version with any incompatible changes in the protocol */
57 #define MESSAGE_VERSION 2
58
59 struct messaging_callback {
60         struct messaging_callback *prev, *next;
61         uint32 msg_type;
62         void (*fn)(struct messaging_context *msg, void *private_data, 
63                    uint32_t msg_type, 
64                    struct server_id server_id, DATA_BLOB *data);
65         void *private_data;
66 };
67
68 struct messaging_context {
69         TDB_CONTEXT *tdb;
70         struct server_id id;
71         struct event_context *event_ctx;
72         struct messaging_callback *callbacks;
73
74         
75 };
76
77 /****************************************************************************
78  Notifications come in as signals.
79 ****************************************************************************/
80
81 static void sig_usr1(void)
82 {
83         received_signal = 1;
84         sys_select_signal(SIGUSR1);
85 }
86
87 /****************************************************************************
88  A useful function for testing the message system.
89 ****************************************************************************/
90
91 static void ping_message(struct messaging_context *msg_ctx,
92                          void *private_data,
93                          uint32_t msg_type,
94                          struct server_id src,
95                          DATA_BLOB *data)
96 {
97         const char *msg = data->data ? (const char *)data->data : "none";
98
99         DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
100                  procid_str_static(&src), msg));
101         messaging_send(msg_ctx, src, MSG_PONG, data);
102 }
103
104 /****************************************************************************
105  Initialise the messaging functions. 
106 ****************************************************************************/
107
108 static BOOL message_tdb_init(struct messaging_context *msg_ctx)
109 {
110         sec_init();
111
112         msg_ctx->tdb = tdb_open_log(lock_path("messages.tdb"), 
113                                     0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, 
114                                     O_RDWR|O_CREAT,0600);
115
116         if (!msg_ctx->tdb) {
117                 DEBUG(0,("ERROR: Failed to initialise messages database\n"));
118                 return False;
119         }
120
121         /* Activate the per-hashchain freelist */
122         tdb_set_max_dead(msg_ctx->tdb, 5);
123
124         CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
125
126         return True;
127 }
128
129 /*******************************************************************
130  Form a static tdb key from a pid.
131 ******************************************************************/
132
133 static TDB_DATA message_key_pid(struct server_id pid)
134 {
135         static char key[20];
136         TDB_DATA kbuf;
137
138         slprintf(key, sizeof(key)-1, "PID/%s", procid_str_static(&pid));
139         
140         kbuf.dptr = (uint8 *)key;
141         kbuf.dsize = strlen(key)+1;
142         return kbuf;
143 }
144
145 /*
146   Fetch the messaging array for a process
147  */
148
149 static NTSTATUS messaging_tdb_fetch(TDB_CONTEXT *msg_tdb,
150                                     TDB_DATA key,
151                                     TALLOC_CTX *mem_ctx,
152                                     struct messaging_array **presult)
153 {
154         struct messaging_array *result;
155         TDB_DATA data;
156         DATA_BLOB blob;
157         NTSTATUS status;
158
159         if (!(result = TALLOC_ZERO_P(mem_ctx, struct messaging_array))) {
160                 return NT_STATUS_NO_MEMORY;
161         }
162
163         data = tdb_fetch(msg_tdb, key);
164
165         if (data.dptr == NULL) {
166                 *presult = result;
167                 return NT_STATUS_OK;
168         }
169
170         blob = data_blob_const(data.dptr, data.dsize);
171
172         status = ndr_pull_struct_blob(
173                 &blob, result, result,
174                 (ndr_pull_flags_fn_t)ndr_pull_messaging_array);
175
176         SAFE_FREE(data.dptr);
177
178         if (!NT_STATUS_IS_OK(status)) {
179                 TALLOC_FREE(result);
180                 return status;
181         }
182
183         if (DEBUGLEVEL >= 10) {
184                 DEBUG(10, ("messaging_tdb_fetch:\n"));
185                 NDR_PRINT_DEBUG(messaging_array, result);
186         }
187
188         *presult = result;
189         return NT_STATUS_OK;
190 }
191
192 /*
193   Store a messaging array for a pid
194 */
195
196 static NTSTATUS messaging_tdb_store(TDB_CONTEXT *msg_tdb,
197                                     TDB_DATA key,
198                                     struct messaging_array *array)
199 {
200         TDB_DATA data;
201         DATA_BLOB blob;
202         NTSTATUS status;
203         TALLOC_CTX *mem_ctx;
204         int ret;
205
206         if (array->num_messages == 0) {
207                 tdb_delete(msg_tdb, key);
208                 return NT_STATUS_OK;
209         }
210
211         if (!(mem_ctx = talloc_new(array))) {
212                 return NT_STATUS_NO_MEMORY;
213         }
214
215         status = ndr_push_struct_blob(
216                 &blob, mem_ctx, array,
217                 (ndr_push_flags_fn_t)ndr_push_messaging_array);
218
219         if (!NT_STATUS_IS_OK(status)) {
220                 talloc_free(mem_ctx);
221                 return status;
222         }
223
224         if (DEBUGLEVEL >= 10) {
225                 DEBUG(10, ("messaging_tdb_store:\n"));
226                 NDR_PRINT_DEBUG(messaging_array, array);
227         }
228
229         data.dptr = blob.data;
230         data.dsize = blob.length;
231
232         ret = tdb_store(msg_tdb, key, data, TDB_REPLACE);
233         TALLOC_FREE(mem_ctx);
234
235         return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
236 }
237
238 /****************************************************************************
239  Notify a process that it has a message. If the process doesn't exist 
240  then delete its record in the database.
241 ****************************************************************************/
242
243 static NTSTATUS message_notify(struct server_id procid)
244 {
245         pid_t pid = procid.pid;
246         int ret;
247         uid_t euid = geteuid();
248
249         /*
250          * Doing kill with a non-positive pid causes messages to be
251          * sent to places we don't want.
252          */
253
254         SMB_ASSERT(pid > 0);
255
256         if (euid != 0) {
257                 /* If we're not root become so to send the message. */
258                 save_re_uid();
259                 set_effective_uid(0);
260         }
261
262         ret = kill(pid, SIGUSR1);
263
264         if (euid != 0) {
265                 /* Go back to who we were. */
266                 int saved_errno = errno;
267                 restore_re_uid_fromroot();
268                 errno = saved_errno;
269         }
270
271         if (ret == 0) {
272                 return NT_STATUS_OK;
273         }
274
275         /*
276          * Something has gone wrong
277          */
278
279         DEBUG(2,("message to process %d failed - %s\n", (int)pid,
280                  strerror(errno)));
281
282         /*
283          * No call to map_nt_error_from_unix -- don't want to link in
284          * errormap.o into lots of utils.
285          */
286
287         if (errno == ESRCH)  return NT_STATUS_INVALID_HANDLE;
288         if (errno == EINVAL) return NT_STATUS_INVALID_PARAMETER;
289         if (errno == EPERM)  return NT_STATUS_ACCESS_DENIED;
290         return NT_STATUS_UNSUCCESSFUL;
291 }
292
293 /****************************************************************************
294  Send a message to a particular pid.
295 ****************************************************************************/
296
297 static NTSTATUS messaging_tdb_send(struct messaging_context *msg_ctx,
298                                    struct server_id pid, int msg_type,
299                                    const DATA_BLOB *data)
300 {
301         struct messaging_array *msg_array;
302         struct messaging_rec *rec;
303         TALLOC_CTX *mem_ctx;
304         NTSTATUS status;
305         TDB_DATA key = message_key_pid(pid);
306
307         /* NULL pointer means implicit length zero. */
308         if (!data->data) {
309                 SMB_ASSERT(data->length == 0);
310         }
311
312         /*
313          * Doing kill with a non-positive pid causes messages to be
314          * sent to places we don't want.
315          */
316
317         SMB_ASSERT(procid_to_pid(&pid) > 0);
318
319         if (!(mem_ctx = talloc_init("message_send_pid"))) {
320                 return NT_STATUS_NO_MEMORY;
321         }
322
323         if (tdb_chainlock(msg_ctx->tdb, key) == -1) {
324                 TALLOC_FREE(mem_ctx);
325                 return NT_STATUS_LOCK_NOT_GRANTED;
326         }
327
328         status = messaging_tdb_fetch(msg_ctx->tdb, key, mem_ctx, &msg_array);
329
330         if (!NT_STATUS_IS_OK(status)) {
331                 goto done;
332         }
333
334         if ((msg_type & MSG_FLAG_LOWPRIORITY)
335             && (msg_array->num_messages > 1000)) {
336                 DEBUG(5, ("Dropping message for PID %s\n",
337                           procid_str_static(&pid)));
338                 status = NT_STATUS_INSUFFICIENT_RESOURCES;
339                 goto done;
340         }
341
342         if (!(rec = TALLOC_REALLOC_ARRAY(mem_ctx, msg_array->messages,
343                                          struct messaging_rec,
344                                          msg_array->num_messages+1))) {
345                 status = NT_STATUS_NO_MEMORY;
346                 goto done;
347         }
348
349         rec[msg_array->num_messages].msg_version = MESSAGE_VERSION;
350         rec[msg_array->num_messages].msg_type = msg_type & MSG_TYPE_MASK;
351         rec[msg_array->num_messages].dest = pid;
352         rec[msg_array->num_messages].src = procid_self();
353         rec[msg_array->num_messages].buf = *data;
354
355         msg_array->messages = rec;
356         msg_array->num_messages += 1;
357
358         status = messaging_tdb_store(msg_ctx->tdb, key, msg_array);
359
360         if (!NT_STATUS_IS_OK(status)) {
361                 goto done;
362         }
363         
364         status = message_notify(pid);
365
366         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
367                 DEBUG(2, ("pid %s doesn't exist - deleting messages record\n",
368                           procid_str_static(&pid)));
369                 tdb_delete(msg_ctx->tdb, message_key_pid(pid));
370         }
371
372  done:
373         tdb_chainunlock(msg_ctx->tdb, key);
374         TALLOC_FREE(mem_ctx);
375         return status;
376 }
377
378 /****************************************************************************
379  Retrieve all messages for the current process.
380 ****************************************************************************/
381
382 static NTSTATUS retrieve_all_messages(TDB_CONTEXT *msg_tdb,
383                                       TALLOC_CTX *mem_ctx,
384                                       struct messaging_array **presult)
385 {
386         struct messaging_array *result;
387         TDB_DATA key = message_key_pid(procid_self());
388         NTSTATUS status;
389
390         if (tdb_chainlock(msg_tdb, key) == -1) {
391                 return NT_STATUS_LOCK_NOT_GRANTED;
392         }
393
394         status = messaging_tdb_fetch(msg_tdb, key, mem_ctx, &result);
395
396         /*
397          * We delete the record here, tdb_set_max_dead keeps it around
398          */
399         tdb_delete(msg_tdb, key);
400         tdb_chainunlock(msg_tdb, key);
401
402         if (NT_STATUS_IS_OK(status)) {
403                 *presult = result;
404         }
405
406         return status;
407 }
408
409 /*
410   Dispatch one messsaging_rec
411 */
412 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
413                                    struct messaging_rec *rec)
414 {
415         struct messaging_callback *cb, *next;
416
417         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
418                 next = cb->next;
419                 if (cb->msg_type == rec->msg_type) {
420                         cb->fn(msg_ctx, cb->private_data, rec->msg_type,
421                                rec->src, &rec->buf);
422                         return;
423                 }
424         }
425         return;
426 }
427
428 /****************************************************************************
429  Receive and dispatch any messages pending for this process.
430  JRA changed Dec 13 2006. Only one message handler now permitted per type.
431  *NOTE*: Dispatch functions must be able to cope with incoming
432  messages on an *odd* byte boundary.
433 ****************************************************************************/
434
435 void message_dispatch(struct messaging_context *msg_ctx)
436 {
437         struct messaging_array *msg_array = NULL;
438         uint32 i;
439
440         if (!received_signal)
441                 return;
442
443         DEBUG(10, ("message_dispatch: received_signal = %d\n",
444                    received_signal));
445
446         received_signal = 0;
447
448         if (!NT_STATUS_IS_OK(retrieve_all_messages(msg_ctx->tdb, NULL,
449                                                    &msg_array))) {
450                 return;
451         }
452
453         for (i=0; i<msg_array->num_messages; i++) {
454                 messaging_dispatch_rec(msg_ctx, &msg_array->messages[i]);
455         }
456
457         TALLOC_FREE(msg_array);
458 }
459
460 /****************************************************************************
461  Register/replace a dispatch function for a particular message type.
462  JRA changed Dec 13 2006. Only one message handler now permitted per type.
463  *NOTE*: Dispatch functions must be able to cope with incoming
464  messages on an *odd* byte boundary.
465 ****************************************************************************/
466
467 struct msg_all {
468         struct messaging_context *msg_ctx;
469         int msg_type;
470         uint32 msg_flag;
471         const void *buf;
472         size_t len;
473         int n_sent;
474 };
475
476 /****************************************************************************
477  Send one of the messages for the broadcast.
478 ****************************************************************************/
479
480 static int traverse_fn(TDB_CONTEXT *the_tdb,
481                        const struct connections_key *ckey,
482                        const struct connections_data *crec,
483                        void *private_data)
484 {
485         struct msg_all *msg_all = (struct msg_all *)private_data;
486         NTSTATUS status;
487
488         if (crec->cnum != -1)
489                 return 0;
490
491         /* Don't send if the receiver hasn't registered an interest. */
492
493         if(!(crec->bcast_msg_flags & msg_all->msg_flag))
494                 return 0;
495
496         /* If the msg send fails because the pid was not found (i.e. smbd died), 
497          * the msg has already been deleted from the messages.tdb.*/
498
499         status = messaging_send_buf(msg_all->msg_ctx,
500                                     crec->pid, msg_all->msg_type,
501                                     (uint8 *)msg_all->buf, msg_all->len);
502
503         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
504
505                 TDB_DATA key;
506                 
507                 /* If the pid was not found delete the entry from
508                  * connections.tdb */
509
510                 DEBUG(2,("pid %s doesn't exist - deleting connections "
511                          "%d [%s]\n", procid_str_static(&crec->pid),
512                          crec->cnum, crec->servicename));
513
514                 key.dptr = (uint8 *)ckey;
515                 key.dsize = sizeof(*ckey);
516
517                 tdb_delete(the_tdb, key);
518         }
519         msg_all->n_sent++;
520         return 0;
521 }
522
523 /**
524  * Send a message to all smbd processes.
525  *
526  * It isn't very efficient, but should be OK for the sorts of
527  * applications that use it. When we need efficient broadcast we can add
528  * it.
529  *
530  * @param n_sent Set to the number of messages sent.  This should be
531  * equal to the number of processes, but be careful for races.
532  *
533  * @retval True for success.
534  **/
535 BOOL message_send_all(struct messaging_context *msg_ctx,
536                       int msg_type,
537                       const void *buf, size_t len,
538                       int *n_sent)
539 {
540         struct msg_all msg_all;
541
542         msg_all.msg_type = msg_type;
543         if (msg_type < 1000)
544                 msg_all.msg_flag = FLAG_MSG_GENERAL;
545         else if (msg_type > 1000 && msg_type < 2000)
546                 msg_all.msg_flag = FLAG_MSG_NMBD;
547         else if (msg_type > 2000 && msg_type < 2100)
548                 msg_all.msg_flag = FLAG_MSG_PRINT_NOTIFY;
549         else if (msg_type > 2100 && msg_type < 3000)
550                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
551         else if (msg_type > 3000 && msg_type < 4000)
552                 msg_all.msg_flag = FLAG_MSG_SMBD;
553         else
554                 return False;
555
556         msg_all.buf = buf;
557         msg_all.len = len;
558         msg_all.n_sent = 0;
559         msg_all.msg_ctx = msg_ctx;
560
561         connections_forall(traverse_fn, &msg_all);
562         if (n_sent)
563                 *n_sent = msg_all.n_sent;
564         return True;
565 }
566
567 /*
568  * Block and unblock receiving of messages. Allows removal of race conditions
569  * when doing a fork and changing message disposition.
570  */
571
572 void message_block(void)
573 {
574         BlockSignals(True, SIGUSR1);
575 }
576
577 void message_unblock(void)
578 {
579         BlockSignals(False, SIGUSR1);
580 }
581
582 struct event_context *messaging_event_context(struct messaging_context *msg_ctx)
583 {
584         return msg_ctx->event_ctx;
585 }
586
587 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
588                                          struct server_id server_id, 
589                                          struct event_context *ev)
590 {
591         struct messaging_context *ctx;
592
593         if (!(ctx = TALLOC_ZERO_P(mem_ctx, struct messaging_context))) {
594                 return NULL;
595         }
596
597         ctx->id = server_id;
598         ctx->event_ctx = ev;
599
600         if (!message_tdb_init(ctx)) {
601                 DEBUG(0, ("message_init failed: %s\n", strerror(errno)));
602                 TALLOC_FREE(ctx);
603         }
604
605         messaging_register(ctx, NULL, MSG_PING, ping_message);
606
607         /* Register some debugging related messages */
608
609         register_msg_pool_usage(ctx);
610         register_dmalloc_msgs(ctx);
611         debug_register_msgs(ctx);
612
613         return ctx;
614 }
615
616 /*
617  * Register a dispatch function for a particular message type. Allow multiple
618  * registrants
619 */
620 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
621                             void *private_data,
622                             uint32_t msg_type,
623                             void (*fn)(struct messaging_context *msg,
624                                        void *private_data, 
625                                        uint32_t msg_type, 
626                                        struct server_id server_id,
627                                        DATA_BLOB *data))
628 {
629         struct messaging_callback *cb;
630
631         /*
632          * Only one callback per type
633          */
634
635         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
636                 if (cb->msg_type == msg_type) {
637                         cb->fn = fn;
638                         cb->private_data = private_data;
639                         return NT_STATUS_OK;
640                 }
641         }
642
643         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
644                 return NT_STATUS_NO_MEMORY;
645         }
646
647         cb->msg_type = msg_type;
648         cb->fn = fn;
649         cb->private_data = private_data;
650
651         DLIST_ADD(msg_ctx->callbacks, cb);
652         return NT_STATUS_OK;
653 }
654
655 /*
656   De-register the function for a particular message type.
657 */
658 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
659                           void *private_data)
660 {
661         struct messaging_callback *cb, *next;
662
663         for (cb = ctx->callbacks; cb; cb = next) {
664                 next = cb->next;
665                 if ((cb->msg_type == msg_type)
666                     && (cb->private_data == private_data)) {
667                         DLIST_REMOVE(ctx->callbacks, cb);
668                         TALLOC_FREE(cb);
669                 }
670         }
671 }
672
673 /*
674   Send a message to a particular server
675 */
676 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
677                         struct server_id server, uint32_t msg_type,
678                         const DATA_BLOB *data)
679 {
680         return messaging_tdb_send(msg_ctx, server, msg_type, data);
681 }
682
683 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
684                             struct server_id server, uint32_t msg_type,
685                             const uint8 *buf, size_t len)
686 {
687         DATA_BLOB blob = data_blob_const(buf, len);
688         return messaging_send(msg_ctx, server, msg_type, &blob);
689 }
690
691 /** @} **/