r23055: Rewrite messages.c to use auto-generated marshalling in the tdb. I'm
[samba.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 2 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23
24 /**
25   @defgroup messages Internal messaging framework
26   @{
27   @file messages.c
28   
29   @brief  Module for internal messaging between Samba daemons. 
30
31    The idea is that if a part of Samba wants to do communication with
32    another Samba process then it will do a message_register() of a
33    dispatch function, and use message_send_pid() to send messages to
34    that process.
35
36    The dispatch function is given the pid of the sender, and it can
37    use that to reply by message_send_pid().  See ping_message() for a
38    simple example.
39
40    @caution Dispatch functions must be able to cope with incoming
41    messages on an *odd* byte boundary.
42
43    This system doesn't have any inherent size limitations but is not
44    very efficient for large messages or when messages are sent in very
45    quick succession.
46
47 */
48
49 #include "includes.h"
50 #include "librpc/gen_ndr/messaging.h"
51 #include "librpc/gen_ndr/ndr_messaging.h"
52
53 /* the locking database handle */
54 static int received_signal;
55
56 /* change the message version with any incompatible changes in the protocol */
57 #define MESSAGE_VERSION 2
58
59 struct messaging_callback {
60         struct messaging_callback *prev, *next;
61         uint32 msg_type;
62         void (*fn)(struct messaging_context *msg, void *private_data, 
63                    uint32_t msg_type, 
64                    struct server_id server_id, DATA_BLOB *data);
65         void *private_data;
66 };
67
68 struct messaging_context {
69         TDB_CONTEXT *tdb;
70         struct server_id id;
71         struct event_context *event_ctx;
72         struct messaging_callback *callbacks;
73 };
74
75 /****************************************************************************
76  Notifications come in as signals.
77 ****************************************************************************/
78
79 static void sig_usr1(void)
80 {
81         received_signal = 1;
82         sys_select_signal(SIGUSR1);
83 }
84
85 static NTSTATUS messaging_tdb_send(TDB_CONTEXT *msg_tdb,
86                                    struct server_id pid, int msg_type,
87                                    const void *buf, size_t len);
88
89 /****************************************************************************
90  A useful function for testing the message system.
91 ****************************************************************************/
92
93 static void ping_message(struct messaging_context *msg_ctx,
94                          void *private_data,
95                          uint32_t msg_type,
96                          struct server_id src,
97                          DATA_BLOB *data)
98 {
99         const char *msg = data->data ? (const char *)data->data : "none";
100
101         DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
102                  procid_str_static(&src), msg));
103         messaging_send(msg_ctx, src, MSG_PONG, data);
104 }
105
106 /****************************************************************************
107  Initialise the messaging functions. 
108 ****************************************************************************/
109
110 static BOOL message_init(struct messaging_context *msg_ctx)
111 {
112         sec_init();
113
114         msg_ctx->tdb = tdb_open_log(lock_path("messages.tdb"), 
115                                     0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, 
116                                     O_RDWR|O_CREAT,0600);
117
118         if (!msg_ctx->tdb) {
119                 DEBUG(0,("ERROR: Failed to initialise messages database\n"));
120                 return False;
121         }
122
123         /* Activate the per-hashchain freelist */
124         tdb_set_max_dead(msg_ctx->tdb, 5);
125
126         CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
127
128         messaging_register(msg_ctx, NULL, MSG_PING, ping_message);
129
130         /* Register some debugging related messages */
131
132         register_msg_pool_usage(msg_ctx);
133         register_dmalloc_msgs(msg_ctx);
134         debug_register_msgs(msg_ctx);
135
136         return True;
137 }
138
139 /*******************************************************************
140  Form a static tdb key from a pid.
141 ******************************************************************/
142
143 static TDB_DATA message_key_pid(struct server_id pid)
144 {
145         static char key[20];
146         TDB_DATA kbuf;
147
148         slprintf(key, sizeof(key)-1, "PID/%s", procid_str_static(&pid));
149         
150         kbuf.dptr = (uint8 *)key;
151         kbuf.dsize = strlen(key)+1;
152         return kbuf;
153 }
154
155 /*
156   Fetch the messaging array for a process
157  */
158
159 static NTSTATUS messaging_tdb_fetch(TDB_CONTEXT *msg_tdb,
160                                     TDB_DATA key,
161                                     TALLOC_CTX *mem_ctx,
162                                     struct messaging_array **presult)
163 {
164         struct messaging_array *result;
165         TDB_DATA data;
166         DATA_BLOB blob;
167         NTSTATUS status;
168
169         if (!(result = TALLOC_ZERO_P(mem_ctx, struct messaging_array))) {
170                 return NT_STATUS_NO_MEMORY;
171         }
172
173         data = tdb_fetch(msg_tdb, key);
174
175         if (data.dptr == NULL) {
176                 *presult = result;
177                 return NT_STATUS_OK;
178         }
179
180         blob = data_blob_const(data.dptr, data.dsize);
181
182         status = ndr_pull_struct_blob(
183                 &blob, result, result,
184                 (ndr_pull_flags_fn_t)ndr_pull_messaging_array);
185
186         SAFE_FREE(data.dptr);
187
188         if (!NT_STATUS_IS_OK(status)) {
189                 TALLOC_FREE(result);
190                 return status;
191         }
192
193         if (DEBUGLEVEL >= 10) {
194                 DEBUG(10, ("messaging_tdb_fetch:\n"));
195                 NDR_PRINT_DEBUG(messaging_array, result);
196         }
197
198         *presult = result;
199         return NT_STATUS_OK;
200 }
201
202 /*
203   Store a messaging array for a pid
204 */
205
206 static NTSTATUS messaging_tdb_store(TDB_CONTEXT *msg_tdb,
207                                     TDB_DATA key,
208                                     struct messaging_array *array)
209 {
210         TDB_DATA data;
211         DATA_BLOB blob;
212         NTSTATUS status;
213         TALLOC_CTX *mem_ctx;
214         int ret;
215
216         if (array->num_messages == 0) {
217                 tdb_delete(msg_tdb, key);
218                 return NT_STATUS_OK;
219         }
220
221         if (!(mem_ctx = talloc_new(array))) {
222                 return NT_STATUS_NO_MEMORY;
223         }
224
225         status = ndr_push_struct_blob(
226                 &blob, mem_ctx, array,
227                 (ndr_push_flags_fn_t)ndr_push_messaging_array);
228
229         if (!NT_STATUS_IS_OK(status)) {
230                 talloc_free(mem_ctx);
231                 return status;
232         }
233
234         if (DEBUGLEVEL >= 10) {
235                 DEBUG(10, ("messaging_tdb_store:\n"));
236                 NDR_PRINT_DEBUG(messaging_array, array);
237         }
238
239         data.dptr = blob.data;
240         data.dsize = blob.length;
241
242         ret = tdb_store(msg_tdb, key, data, TDB_REPLACE);
243         TALLOC_FREE(mem_ctx);
244
245         return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
246 }
247
248 /****************************************************************************
249  Notify a process that it has a message. If the process doesn't exist 
250  then delete its record in the database.
251 ****************************************************************************/
252
253 static NTSTATUS message_notify(struct server_id procid)
254 {
255         pid_t pid = procid.pid;
256         int ret;
257         uid_t euid = geteuid();
258
259         /*
260          * Doing kill with a non-positive pid causes messages to be
261          * sent to places we don't want.
262          */
263
264         SMB_ASSERT(pid > 0);
265
266         if (euid != 0) {
267                 /* If we're not root become so to send the message. */
268                 save_re_uid();
269                 set_effective_uid(0);
270         }
271
272         ret = kill(pid, SIGUSR1);
273
274         if (euid != 0) {
275                 /* Go back to who we were. */
276                 int saved_errno = errno;
277                 restore_re_uid_fromroot();
278                 errno = saved_errno;
279         }
280
281         if (ret == 0) {
282                 return NT_STATUS_OK;
283         }
284
285         /*
286          * Something has gone wrong
287          */
288
289         DEBUG(2,("message to process %d failed - %s\n", (int)pid,
290                  strerror(errno)));
291
292         /*
293          * No call to map_nt_error_from_unix -- don't want to link in
294          * errormap.o into lots of utils.
295          */
296
297         if (errno == ESRCH)  return NT_STATUS_INVALID_HANDLE;
298         if (errno == EINVAL) return NT_STATUS_INVALID_PARAMETER;
299         if (errno == EPERM)  return NT_STATUS_ACCESS_DENIED;
300         return NT_STATUS_UNSUCCESSFUL;
301 }
302
303 /****************************************************************************
304  Send a message to a particular pid.
305 ****************************************************************************/
306
307 static NTSTATUS messaging_tdb_send(TDB_CONTEXT *msg_tdb,
308                                    struct server_id pid, int msg_type,
309                                    const void *buf, size_t len)
310 {
311         struct messaging_array *msg_array;
312         struct messaging_rec *rec;
313         TALLOC_CTX *mem_ctx;
314         NTSTATUS status;
315         TDB_DATA key = message_key_pid(pid);
316
317         /* NULL pointer means implicit length zero. */
318         if (!buf) {
319                 SMB_ASSERT(len == 0);
320         }
321
322         /*
323          * Doing kill with a non-positive pid causes messages to be
324          * sent to places we don't want.
325          */
326
327         SMB_ASSERT(procid_to_pid(&pid) > 0);
328
329         if (!(mem_ctx = talloc_init("message_send_pid"))) {
330                 return NT_STATUS_NO_MEMORY;
331         }
332
333         if (tdb_chainlock(msg_tdb, key) == -1) {
334                 return NT_STATUS_LOCK_NOT_GRANTED;
335         }
336
337         status = messaging_tdb_fetch(msg_tdb, key, mem_ctx, &msg_array);
338
339         if (!NT_STATUS_IS_OK(status)) {
340                 tdb_chainunlock(msg_tdb, key);
341                 TALLOC_FREE(mem_ctx);
342                 return status;
343         }
344
345         if (!(rec = TALLOC_REALLOC_ARRAY(mem_ctx, msg_array->messages,
346                                          struct messaging_rec,
347                                          msg_array->num_messages+1))) {
348                 tdb_chainunlock(msg_tdb, key);
349                 TALLOC_FREE(mem_ctx);
350                 return NT_STATUS_NO_MEMORY;
351         }
352
353         rec[msg_array->num_messages].msg_version = MESSAGE_VERSION;
354         rec[msg_array->num_messages].msg_type = msg_type;
355         rec[msg_array->num_messages].dest = pid;
356         rec[msg_array->num_messages].src = procid_self();
357         rec[msg_array->num_messages].buf = data_blob_const(buf, len);
358
359         msg_array->messages = rec;
360         msg_array->num_messages += 1;
361
362         status = messaging_tdb_store(msg_tdb, key, msg_array);
363
364         tdb_chainunlock(msg_tdb, key);
365         TALLOC_FREE(mem_ctx);
366
367         if (!NT_STATUS_IS_OK(status)) {
368                 return status;
369         }
370         
371         status = message_notify(pid);
372
373         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
374                 DEBUG(2, ("pid %s doesn't exist - deleting messages record\n",
375                           procid_str_static(&pid)));
376                 tdb_delete(msg_tdb, message_key_pid(pid));
377         }
378
379         return status;
380 }
381
382 /****************************************************************************
383  Count the messages pending for a particular pid. Expensive....
384 ****************************************************************************/
385
386 unsigned int messages_pending_for_pid(struct messaging_context *msg_ctx,
387                                       struct server_id pid)
388 {
389         struct messaging_array *msg_array;
390         unsigned int result;
391
392         if (!NT_STATUS_IS_OK(messaging_tdb_fetch(msg_ctx->tdb,
393                                                  message_key_pid(pid), NULL,
394                                                  &msg_array))) {
395                 DEBUG(10, ("messaging_tdb_fetch failed\n"));
396                 return 0;
397         }
398
399         result = msg_array->num_messages;
400         TALLOC_FREE(msg_array);
401         return result;
402 }       
403
404 /****************************************************************************
405  Retrieve all messages for the current process.
406 ****************************************************************************/
407
408 static NTSTATUS retrieve_all_messages(TDB_CONTEXT *msg_tdb,
409                                       TALLOC_CTX *mem_ctx,
410                                       struct messaging_array **presult)
411 {
412         struct messaging_array *result;
413         TDB_DATA key = message_key_pid(procid_self());
414         NTSTATUS status;
415
416         if (tdb_chainlock(msg_tdb, key) == -1) {
417                 return NT_STATUS_LOCK_NOT_GRANTED;
418         }
419
420         status = messaging_tdb_fetch(msg_tdb, key, mem_ctx, &result);
421
422         /*
423          * We delete the record here, tdb_set_max_dead keeps it around
424          */
425         tdb_delete(msg_tdb, key);
426         tdb_chainunlock(msg_tdb, key);
427
428         if (NT_STATUS_IS_OK(status)) {
429                 *presult = result;
430         }
431
432         return status;
433 }
434
435 /*
436   Dispatch one messsaging_rec
437 */
438 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
439                                    struct messaging_rec *rec)
440 {
441         struct messaging_callback *cb, *next;
442
443         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
444                 next = cb->next;
445                 if (cb->msg_type == rec->msg_type) {
446                         cb->fn(msg_ctx, cb->private_data, rec->msg_type,
447                                rec->src, &rec->buf);
448                         return;
449                 }
450         }
451         return;
452 }
453
454 /****************************************************************************
455  Receive and dispatch any messages pending for this process.
456  JRA changed Dec 13 2006. Only one message handler now permitted per type.
457  *NOTE*: Dispatch functions must be able to cope with incoming
458  messages on an *odd* byte boundary.
459 ****************************************************************************/
460
461 void message_dispatch(struct messaging_context *msg_ctx)
462 {
463         struct messaging_array *msg_array = NULL;
464         uint32 i;
465
466         if (!received_signal)
467                 return;
468
469         DEBUG(10, ("message_dispatch: received_signal = %d\n",
470                    received_signal));
471
472         received_signal = 0;
473
474         if (!NT_STATUS_IS_OK(retrieve_all_messages(msg_ctx->tdb, NULL,
475                                                    &msg_array))) {
476                 return;
477         }
478
479         for (i=0; i<msg_array->num_messages; i++) {
480                 messaging_dispatch_rec(msg_ctx, &msg_array->messages[i]);
481         }
482
483         TALLOC_FREE(msg_array);
484 }
485
486 /****************************************************************************
487  Register/replace a dispatch function for a particular message type.
488  JRA changed Dec 13 2006. Only one message handler now permitted per type.
489  *NOTE*: Dispatch functions must be able to cope with incoming
490  messages on an *odd* byte boundary.
491 ****************************************************************************/
492
493 struct msg_all {
494         struct messaging_context *msg_ctx;
495         int msg_type;
496         uint32 msg_flag;
497         const void *buf;
498         size_t len;
499         int n_sent;
500 };
501
502 /****************************************************************************
503  Send one of the messages for the broadcast.
504 ****************************************************************************/
505
506 static int traverse_fn(TDB_CONTEXT *the_tdb,
507                        const struct connections_key *ckey,
508                        const struct connections_data *crec,
509                        void *private_data)
510 {
511         struct msg_all *msg_all = (struct msg_all *)private_data;
512         NTSTATUS status;
513
514         if (crec->cnum != -1)
515                 return 0;
516
517         /* Don't send if the receiver hasn't registered an interest. */
518
519         if(!(crec->bcast_msg_flags & msg_all->msg_flag))
520                 return 0;
521
522         /* If the msg send fails because the pid was not found (i.e. smbd died), 
523          * the msg has already been deleted from the messages.tdb.*/
524
525         status = messaging_send_buf(msg_all->msg_ctx,
526                                     crec->pid, msg_all->msg_type,
527                                     (uint8 *)msg_all->buf, msg_all->len);
528
529         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
530
531                 TDB_DATA key;
532                 
533                 /* If the pid was not found delete the entry from
534                  * connections.tdb */
535
536                 DEBUG(2,("pid %s doesn't exist - deleting connections "
537                          "%d [%s]\n", procid_str_static(&crec->pid),
538                          crec->cnum, crec->servicename));
539
540                 key.dptr = (uint8 *)ckey;
541                 key.dsize = sizeof(*ckey);
542
543                 tdb_delete(the_tdb, key);
544         }
545         msg_all->n_sent++;
546         return 0;
547 }
548
549 /**
550  * Send a message to all smbd processes.
551  *
552  * It isn't very efficient, but should be OK for the sorts of
553  * applications that use it. When we need efficient broadcast we can add
554  * it.
555  *
556  * @param n_sent Set to the number of messages sent.  This should be
557  * equal to the number of processes, but be careful for races.
558  *
559  * @retval True for success.
560  **/
561 BOOL message_send_all(struct messaging_context *msg_ctx,
562                       int msg_type,
563                       const void *buf, size_t len,
564                       int *n_sent)
565 {
566         struct msg_all msg_all;
567
568         msg_all.msg_type = msg_type;
569         if (msg_type < 1000)
570                 msg_all.msg_flag = FLAG_MSG_GENERAL;
571         else if (msg_type > 1000 && msg_type < 2000)
572                 msg_all.msg_flag = FLAG_MSG_NMBD;
573         else if (msg_type > 2000 && msg_type < 2100)
574                 msg_all.msg_flag = FLAG_MSG_PRINT_NOTIFY;
575         else if (msg_type > 2100 && msg_type < 3000)
576                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
577         else if (msg_type > 3000 && msg_type < 4000)
578                 msg_all.msg_flag = FLAG_MSG_SMBD;
579         else
580                 return False;
581
582         msg_all.buf = buf;
583         msg_all.len = len;
584         msg_all.n_sent = 0;
585         msg_all.msg_ctx = msg_ctx;
586
587         connections_forall(traverse_fn, &msg_all);
588         if (n_sent)
589                 *n_sent = msg_all.n_sent;
590         return True;
591 }
592
593 /*
594  * Block and unblock receiving of messages. Allows removal of race conditions
595  * when doing a fork and changing message disposition.
596  */
597
598 void message_block(void)
599 {
600         BlockSignals(True, SIGUSR1);
601 }
602
603 void message_unblock(void)
604 {
605         BlockSignals(False, SIGUSR1);
606 }
607
608 struct event_context *messaging_event_context(struct messaging_context *msg_ctx)
609 {
610         return msg_ctx->event_ctx;
611 }
612
613 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
614                                          struct server_id server_id, 
615                                          struct event_context *ev)
616 {
617         struct messaging_context *ctx;
618
619         if (!(ctx = TALLOC_ZERO_P(mem_ctx, struct messaging_context))) {
620                 return NULL;
621         }
622
623         ctx->id = server_id;
624         ctx->event_ctx = ev;
625
626         if (!message_init(ctx)) {
627                 DEBUG(0, ("message_init failed: %s\n", strerror(errno)));
628                 TALLOC_FREE(ctx);
629         }
630
631         return ctx;
632 }
633
634 /*
635  * Register a dispatch function for a particular message type. Allow multiple
636  * registrants
637 */
638 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
639                             void *private_data,
640                             uint32_t msg_type,
641                             void (*fn)(struct messaging_context *msg,
642                                        void *private_data, 
643                                        uint32_t msg_type, 
644                                        struct server_id server_id,
645                                        DATA_BLOB *data))
646 {
647         struct messaging_callback *cb;
648
649         /*
650          * Only one callback per type
651          */
652
653         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
654                 if (cb->msg_type == msg_type) {
655                         cb->fn = fn;
656                         cb->private_data = private_data;
657                         return NT_STATUS_OK;
658                 }
659         }
660
661         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
662                 return NT_STATUS_NO_MEMORY;
663         }
664
665         cb->msg_type = msg_type;
666         cb->fn = fn;
667         cb->private_data = private_data;
668
669         DLIST_ADD(msg_ctx->callbacks, cb);
670         return NT_STATUS_OK;
671 }
672
673 /*
674   De-register the function for a particular message type.
675 */
676 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
677                           void *private_data)
678 {
679         struct messaging_callback *cb, *next;
680
681         for (cb = ctx->callbacks; cb; cb = next) {
682                 next = cb->next;
683                 if ((cb->msg_type == msg_type)
684                     && (cb->private_data == private_data)) {
685                         DLIST_REMOVE(ctx->callbacks, cb);
686                         TALLOC_FREE(cb);
687                 }
688         }
689 }
690
691 /*
692   Send a message to a particular server
693 */
694 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
695                         struct server_id server, 
696                         uint32_t msg_type, const DATA_BLOB *data)
697 {
698         return messaging_tdb_send(msg_ctx->tdb, server, msg_type,
699                                   data->data, data->length);
700 }
701
702 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
703                             struct server_id server, uint32_t msg_type,
704                             const uint8 *buf, size_t len)
705 {
706         DATA_BLOB blob = data_blob_const(buf, len);
707         return messaging_send(msg_ctx, server, msg_type, &blob);
708 }
709
710 /** @} **/