PEP 55 Update license on source files to current license text and date
[tpot/pegasus/.git] / src / Pegasus / Common / ModuleController.cpp
1 //%2003////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001, 2002  BMC Software, Hewlett-Packard Development
4 // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 //
8 // Permission is hereby granted, free of charge, to any person obtaining a copy
9 // of this software and associated documentation files (the "Software"), to
10 // deal in the Software without restriction, including without limitation the
11 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
12 // sell copies of the Software, and to permit persons to whom the Software is
13 // furnished to do so, subject to the following conditions:
14 // 
15 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
16 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
17 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
18 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
19 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
20 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
21 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 //
24 //==============================================================================
25 //
26 // Author: Mike Day (mdday@us.ibm.com) << Tue Mar 19 13:19:24 2002 mdd >>
27 //
28 // Modified By:
29 //
30 //%/////////////////////////////////////////////////////////////////////////////
31
32 #include "ModuleController.h"
33 #include <Pegasus/Common/MessageLoader.h> //l10n
34
35 PEGASUS_NAMESPACE_BEGIN
36
37 PEGASUS_USING_STD;
38  
39 ModuleController::callback_handle * ModuleController::callback_handle::_head;
40 const int ModuleController::callback_handle::BLOCK_SIZE = 20;
41 Mutex ModuleController::callback_handle::_alloc_mut;
42
43
44
45
46
47 void * ModuleController::callback_handle::operator new(size_t size)
48 {
49    if( size != sizeof(callback_handle))
50       return ::operator new(size);
51    _alloc_mut.lock(pegasus_thread_self());
52    callback_handle *node = _head;
53    if(node)
54       _head = reinterpret_cast<callback_handle *>(node->_parm);
55    else
56    {
57       callback_handle *block = 
58          reinterpret_cast<callback_handle *>(::operator new(BLOCK_SIZE * sizeof(callback_handle)));
59       int i;
60       for(i = 1; i < BLOCK_SIZE - 1; ++i)
61          block[i]._parm = & block[i + 1];
62       block[BLOCK_SIZE - 1]._parm = NULL;
63       node = block;
64       _head = &block[1];
65    }
66    _alloc_mut.unlock();
67    return node;
68 }
69
70
71 void ModuleController::callback_handle::operator delete(void *dead, size_t size)
72 {
73    if(dead == 0)
74       return;
75    if(size != sizeof(callback_handle))
76    {
77       ::operator delete(dead);
78       return;
79    }
80    callback_handle *node = reinterpret_cast<callback_handle *>(dead);
81    _alloc_mut.lock(pegasus_thread_self());
82    node->_parm = _head;
83    _head = node;
84    _alloc_mut.unlock();
85 }
86
87
88 pegasus_module::module_rep::module_rep(ModuleController *controller, 
89                                        const String & name,
90                                        void *module_address, 
91                                        Message * (*receive_message)(Message *, void *),
92                                        void (*async_callback)(Uint32, Message *, void *),
93                                        void (*shutdown_notify)(Uint32 code, void *))
94    : Base ( pegasus_internal_identity(peg_credential_types::MODULE) ),
95      _thread_safety(),
96      _controller(controller), 
97      _name(name), 
98      _reference_count(1), 
99      _shutting_down(0),
100      _module_address(module_address)
101
102 {
103    if(receive_message != NULL)
104       _receive_message = receive_message;
105    else
106       _receive_message = default_receive_message;
107    if(async_callback != NULL)
108       _async_callback = async_callback;
109    else
110       _async_callback = default_async_callback;
111    if(shutdown_notify != NULL)
112       _shutdown_notify = shutdown_notify;
113    else
114       _shutdown_notify = default_shutdown_notify;
115 }
116
117
118 pegasus_module::module_rep::~module_rep(void)
119 {
120    _send_shutdown_notify();
121 }
122
123 Message * pegasus_module::module_rep::module_receive_message(Message *msg)
124 {
125    Message * ret;
126    _thread_safety.lock(pegasus_thread_self());
127    try {  ret = _receive_message(msg, _module_address); }
128    catch(...)
129    {
130       _thread_safety.unlock();
131       throw;
132    }
133    _thread_safety.unlock();
134    return ret;
135 }
136
137 void pegasus_module::module_rep::_send_async_callback(Uint32 msg_handle, Message *msg, void *parm)
138 {
139    _thread_safety.lock(pegasus_thread_self());
140    try  { _async_callback(msg_handle, msg, parm); }
141    catch(...) { _thread_safety.unlock(); throw; }
142    _thread_safety.unlock();
143 }
144  
145 void pegasus_module::module_rep::_send_shutdown_notify(void)
146 {
147    _thread_safety.lock(pegasus_thread_self());
148    if(_reference_count.value() == 0 )
149    {
150       if( _shutting_down == 0 )
151       {
152          _shutting_down++;
153          _shutdown_notify(_reference_count.value(), _module_address);
154          _async_callback = closed_async_callback;
155          _receive_message = closed_receive_message;
156       }
157    }
158    _thread_safety.unlock();
159 }
160
161
162 Boolean pegasus_module::module_rep::authorized()
163 {
164    return true;
165 }
166
167 Boolean pegasus_module::module_rep::authorized(Uint32 operation)
168 {
169    return true;
170 }
171
172 Boolean pegasus_module::module_rep::authorized(Uint32 index, Uint32 operation)
173 {
174    return true;
175 }
176
177
178 pegasus_module::pegasus_module(ModuleController *controller, 
179                                const String &id, 
180                                void *module_address,
181                                Message * (*receive_message)(Message *, void *),
182                                void (*async_callback)(Uint32, Message *, void *),
183                                void (*shutdown_notify)(Uint32 code, void *))
184 {
185    _rep = new module_rep(controller, 
186                          id, 
187                          module_address, 
188                          receive_message,
189                          async_callback,
190                          shutdown_notify);
191    _allowed_operations = ModuleController::GET_CLIENT_HANDLE |
192                          ModuleController::REGISTER_MODULE |
193                          ModuleController::DEREGISTER_MODULE | 
194                          ModuleController::FIND_SERVICE |
195                          ModuleController::FIND_MODULE_IN_SERVICE | 
196                          ModuleController::GET_MODULE_REFERENCE | 
197                          ModuleController::MODULE_SEND_WAIT | 
198                          ModuleController::MODULE_SEND_WAIT_MODULE | 
199                          ModuleController::MODULE_SEND_ASYNC | 
200                          ModuleController::MODULE_SEND_ASYNC_MODULE | 
201                          ModuleController::BLOCKING_THREAD_EXEC | 
202                          ModuleController::ASYNC_THREAD_EXEC;
203 }
204
205 pegasus_module::pegasus_module(const pegasus_module & mod)
206 {
207    mod._rep->reference();
208    _rep = mod._rep;
209 }
210
211 pegasus_module::~pegasus_module(void)
212 {
213    _rep->dereference();
214    _send_shutdown_notify();
215    if( 0 == _rep->reference_count())
216       delete _rep;
217 }
218
219 Boolean pegasus_module::authorized(Uint32 operation)
220 {
221    return true;
222 }
223
224 Boolean pegasus_module::authorized()
225 {
226    return true;
227 }
228
229
230 pegasus_module & pegasus_module::operator= (const pegasus_module & mod)
231 {   
232    if( this != &mod)
233    {
234       if ( _rep->reference_count() == 0 )
235          delete _rep;
236       _rep = mod._rep;
237    }
238    return *this;
239 }
240
241 Boolean pegasus_module::operator== (const pegasus_module & mod) const 
242 {
243    if( mod._rep == _rep )
244       return true;
245    return false;
246    
247 }
248       
249 Boolean pegasus_module::operator == (const String &  mod) const
250 {
251    if(_rep->get_name() == mod)
252       return true; 
253    return false;
254 }
255
256
257 Boolean pegasus_module::operator == (const void *mod) const
258 {
259    if ( (reinterpret_cast<const pegasus_module *>(mod))->_rep == _rep)
260       return true;
261    return false;
262 }
263
264 const String & pegasus_module::get_name(void) const
265 {
266    return _rep->get_name();
267 }
268
269
270 Boolean pegasus_module::query_interface(const String & class_id,  
271                                         void **object_ptr) const 
272 {
273    PEGASUS_ASSERT(object_ptr != NULL);
274    if( class_id == _rep->get_name())
275    {
276       *object_ptr = _rep->get_module_address();
277       return true;
278    }
279    *object_ptr = NULL;
280    return false;
281 }
282
283 Message * pegasus_module::_receive_message(Message *msg)
284 {
285    return _rep->module_receive_message(msg);
286 }
287
288 void pegasus_module::_send_async_callback(Uint32 msg_handle, Message *msg, void *parm) 
289 {
290    _rep->_send_async_callback(msg_handle, msg, parm);
291 }
292
293 void pegasus_module::_send_shutdown_notify(void)
294 {
295    _rep->_send_shutdown_notify();
296 }
297
298 Boolean pegasus_module::_shutdown(void) 
299 {
300    _send_shutdown_notify(); 
301    return true; 
302
303
304
305
306 const Uint32 ModuleController::GET_CLIENT_HANDLE =          0x00000001;
307 const Uint32 ModuleController::REGISTER_MODULE =            0x00000002;
308 const Uint32 ModuleController::DEREGISTER_MODULE =          0x00000004;
309 const Uint32 ModuleController::FIND_SERVICE =               0x00000008;
310 const Uint32 ModuleController::FIND_MODULE_IN_SERVICE =     0x00000010;
311 const Uint32 ModuleController::GET_MODULE_REFERENCE =       0x00000020;
312 const Uint32 ModuleController::MODULE_SEND_WAIT =           0x00000040;
313 const Uint32 ModuleController::MODULE_SEND_WAIT_MODULE =    0x00000040;
314 const Uint32 ModuleController::MODULE_SEND_ASYNC =          0x00000080;
315 const Uint32 ModuleController::MODULE_SEND_ASYNC_MODULE =   0x00000080;
316 const Uint32 ModuleController::BLOCKING_THREAD_EXEC =       0x00000100;
317 const Uint32 ModuleController::ASYNC_THREAD_EXEC =          0x00000200;
318 const Uint32 ModuleController::CLIENT_SEND_WAIT =           0x00000400;
319 const Uint32 ModuleController::CLIENT_SEND_WAIT_MODULE =    0x00000400;
320 const Uint32 ModuleController::CLIENT_SEND_ASYNC =          0x00000800;
321 const Uint32 ModuleController::CLIENT_SEND_ASYNC_MODULE =   0x00000800;
322 const Uint32 ModuleController::CLIENT_BLOCKING_THREAD_EXEC =0x00001000;
323 const Uint32 ModuleController::CLIENT_ASYNC_THREAD_EXEC =   0x00001000;
324 const Uint32 ModuleController::CLIENT_SEND_FORGET =         0x00002000;
325 const Uint32 ModuleController::CLIENT_SEND_FORGET_MODULE =  0x00002000;
326 const Uint32 ModuleController::MODULE_SEND_FORGET =         0x00004000;
327 const Uint32 ModuleController::MODULE_SEND_FORGET_MODULE =  0x00004000;
328
329
330 Boolean ModuleController::client_handle::authorized()
331 {
332    return true;
333 }
334
335 Boolean ModuleController::client_handle::authorized(Uint32 operation)
336 {
337    return true;
338 }
339  
340
341 Boolean ModuleController::client_handle::authorized(Uint32 index, Uint32 operation)
342 {
343    return true;
344 }
345  
346
347 // NOTE: "destroy" is defined in <memory> on HP-UX and must not be redefined
348 static struct timeval createTime = {0, 50000};
349 static struct timeval destroyTime = {15, 0};
350 static struct timeval deadlockTime = {5, 0};
351
352 ModuleController::ModuleController(const char *name )
353    :Base(name, MessageQueue::getNextQueueId(), 
354          module_capabilities::module_controller |
355          module_capabilities::async),
356     _modules(true),
357     _internal_module(this, String("INTERNAL"), this, NULL, NULL, NULL)
358
359
360 }
361
362 // ModuleController::ModuleController(const char *name ,
363 //                                 Sint16 min_threads, 
364 //                                 Sint16 max_threads,
365 //                                 struct timeval & create_thread,
366 //                                 struct timeval & destroy_thread,
367 //                                 struct timeval & deadlock)
368 //    :Base(name, MessageQueue::getNextQueueId(),
369 //       module_capabilities::module_controller |
370 //       module_capabilities::async),
371 //    _modules(true),
372 //     _thread_pool(min_threads + 1,  
373 //               name, min_threads, 
374 //               max_threads, 
375 //               create_thread, 
376 //               destroy_thread, 
377 //               deadlock)   
378 // { 
379
380 // }
381
382 ModuleController::~ModuleController()
383 {
384
385    pegasus_module *module;
386
387    try 
388    {
389       module = _modules.remove_first();
390       while(module)
391       {
392          delete module;
393          module = _modules.remove_first();
394       }
395
396    }
397    catch(...)
398    {
399    }
400 }
401
402 // called by a module to register itself, returns a handle to the controller 
403 ModuleController & ModuleController::register_module(const String & controller_name,
404                                                      const String & module_name, 
405                                                      void *module_address, 
406                                                      Message * (*receive_message)(Message *, void *),
407                                                      void (*async_callback)(Uint32, Message *, void *),
408                                                      void (*shutdown_notify)(Uint32, void *),
409                                                      pegasus_module **instance) 
410    throw(AlreadyExistsException, IncompatibleTypesException)
411 {
412
413    pegasus_module *module ;
414    ModuleController *controller;
415    
416
417    Array<Uint32> services;
418    
419    MessageQueue *message_queue = MessageQueue::lookup(controller_name.getCString());
420    
421    if ((message_queue == NULL) || ( false == message_queue->isAsync() ))
422    {
423       throw IncompatibleTypesException();
424    }
425
426    MessageQueueService *service = static_cast<MessageQueueService *>(message_queue);
427    if( (service == NULL) ||  ! ( service->get_capabilities() & module_capabilities::module_controller ))
428    {
429       throw IncompatibleTypesException();
430    }
431
432    controller = static_cast<ModuleController *>(service);
433    
434    
435    {
436       
437    // see if the module already exists in this controller.
438    _module_lock lock(&(controller->_modules));
439    
440    module = controller->_modules.next(0);
441    while(module != NULL )
442    {
443       if(module->get_name() == module_name )
444       {
445          //l10n
446          //throw AlreadyExistsException("module \"" + module_name + "\"");
447          MessageLoaderParms parms("Common.ModuleController.MODULE",
448                                                           "module \"$0\"",
449                                                           module_name);
450          throw AlreadyExistsException(parms);
451       }
452       module = controller->_modules.next(module);
453    }
454    
455    }
456    
457    // now reserve this module name with the meta dispatcher
458
459    Uint32 result = 0 ;
460    RegisteredModule *request = 
461       new RegisteredModule(controller->get_next_xid(),
462                            0, 
463                            true, 
464                            controller->getQueueId(),
465                            module_name);
466
467    request->dest = CIMOM_Q_ID;
468    
469    AsyncReply * response = controller->SendWait(request);
470    if( response != NULL)
471       result  = response->result;
472    
473    delete request; 
474    delete response;
475    if ( result == async_results::MODULE_ALREADY_REGISTERED){
476         //l10n
477       //throw AlreadyExistsException("module \"" + module_name + "\"");
478       MessageLoaderParms parms("Common.ModuleController.MODULE",
479                                                           "module \"$0\"",
480                                                           module_name);
481          throw AlreadyExistsException(parms);
482       
483    }
484    
485    // the module does not exist, go ahead and create it. 
486    module = new pegasus_module(controller, 
487                                module_name, 
488                                module_address, 
489                                receive_message, 
490                                async_callback, 
491                                shutdown_notify);
492    
493    controller->_modules.insert_last(module);
494    
495    if(instance != NULL)
496       *instance = module;
497    
498    return *controller;
499 }
500
501
502 Boolean ModuleController::deregister_module(const String & module_name)
503    throw (Permission)
504
505 {
506    DeRegisteredModule *request = 
507       new DeRegisteredModule(get_next_xid(),
508                              0,
509                              true,
510                              getQueueId(),
511                              module_name);
512    request->dest = _meta_dispatcher->getQueueId();
513    
514    AsyncReply * response = SendWait(request);
515
516    delete request;
517    delete response;
518    
519    pegasus_module *module;
520
521    _module_lock lock(&_modules);
522    module = _modules.next(0);
523    while(module != NULL )
524    {
525       if( module->get_name() == module_name)
526       {
527          _modules.remove_no_lock(module);
528          return true;
529       }
530       module = _modules.next(module);
531    }
532    return false;
533 }
534
535 Boolean ModuleController::verify_handle(pegasus_module *handle)
536 {
537    pegasus_module *module;
538    
539    // ATTN change to use authorization and the pegasus_id class
540    // << Fri Apr  5 12:43:19 2002 mdd >>
541    if( handle->_rep->_module_address == (void *)this)
542       return true;
543    
544    _module_lock lock(&_modules);
545    
546    module = _modules.next(0);
547    while(module != NULL)
548    {
549       if ( module == handle)
550       {
551          return true;
552       }
553       module = _modules.next(module);
554    }
555    return false;
556 }
557
558 // given a name, find a service's queue id
559 Uint32 ModuleController::find_service(const pegasus_module & handle, 
560                                       const String & name) throw(Permission)
561 {
562
563    if ( false == verify_handle(const_cast<pegasus_module *>(&handle)) )
564       throw Permission(pegasus_thread_self());
565    Array<Uint32> services;
566    Base::find_services(name, 0, 0, &services);
567    return( services[0]);
568 }
569
570
571 // returns the queue ID of the service hosting the named module, 
572 // zero otherwise
573
574 Uint32 ModuleController::find_module_in_service(const pegasus_module & handle, 
575                                                 const String & name)
576    throw(Permission, IPCException)
577 {
578    if ( false == verify_handle(const_cast<pegasus_module *>(&handle)))
579       throw(Permission(pegasus_thread_self()));
580    
581    Uint32 result = 0 ;
582    
583
584    FindModuleInService *request = 
585       new FindModuleInService(get_next_xid(),
586                               0, 
587                               true, 
588                               _meta_dispatcher->getQueueId(),
589                               name);
590    request->dest = _meta_dispatcher->getQueueId();
591    FindModuleInServiceResponse * response =
592       static_cast<FindModuleInServiceResponse *>(SendWait(request));
593    if( response != NULL)
594       result = response->_module_service_queue;
595
596    delete request;
597    delete response;
598    
599    return result;
600 }
601
602
603 pegasus_module * ModuleController::get_module_reference(const pegasus_module & my_handle, 
604                                                         const String & module_name)
605    throw(Permission)
606 {
607    if ( false == verify_handle(const_cast<pegasus_module *>(&my_handle)))
608       throw(Permission(pegasus_thread_self()));
609
610    pegasus_module *module, *ref = NULL;
611    _module_lock lock(&_modules);
612    module = _modules.next(0);
613    while(module != NULL)
614    {
615       if(module->get_name() == module_name)
616       {
617          ref = new pegasus_module(*module);
618          break;
619       }
620       module = _modules.next(module);
621    }
622    return ref;
623 }
624
625
626 AsyncReply *ModuleController::_send_wait(Uint32 destination_q, 
627                                          AsyncRequest *request)
628 {
629    request->dest = destination_q;
630    AsyncReply *reply = Base::SendWait(request);
631    return reply;
632 }
633
634
635 // sendwait to another service
636 AsyncReply * ModuleController::ModuleSendWait(const pegasus_module & handle, 
637                                               Uint32 destination_q,
638                                               AsyncRequest *request)
639    throw(Permission, IPCException)
640 {
641    if ( false == verify_handle(const_cast<pegasus_module *>(&handle)))
642       throw(Permission(pegasus_thread_self()));
643
644    return _send_wait(destination_q, request);
645 }
646
647 AsyncReply *ModuleController::_send_wait(Uint32 destination_q, 
648                                          const String & destination_module, 
649                                          AsyncRequest *message)
650 {
651    AsyncModuleOperationStart *request = 
652       new AsyncModuleOperationStart(get_next_xid(),
653                                     0, 
654                                     destination_q,
655                                     getQueueId(),
656                                     true, 
657                                     destination_module,
658                                     message);
659    
660    request->dest = destination_q;
661    AsyncModuleOperationResult *response = 
662       static_cast<AsyncModuleOperationResult *>(SendWait(request));
663    
664    AsyncReply *ret = 0;
665    
666    if (response != NULL && response->getType() == async_messages::ASYNC_MODULE_OP_RESULT )
667    {
668       ret = static_cast<AsyncReply *>(response->get_result());
669       //clear the request out of the envelope so it can be deleted by the module
670    }
671    request->get_action();
672    delete request; 
673    delete response;
674    return ret;
675 }
676
677
678 // sendwait to another module controlled by another service. 
679 // throws Deadlock() if destination_q is this->queue_id
680 AsyncReply * ModuleController::ModuleSendWait(const pegasus_module & handle, 
681                                               Uint32 destination_q, 
682                                               const String & destination_module,
683                                               AsyncRequest *message)
684    throw(Permission, Deadlock, IPCException)
685 {
686    if ( false == verify_handle(const_cast<pegasus_module *>(&handle)))
687       throw(Permission(pegasus_thread_self()));
688    
689    return _send_wait(destination_q, destination_module, message);
690 }
691
692 void ModuleController::_async_handleEnqueue(AsyncOpNode *op, 
693                                             MessageQueue *q, 
694                                             void *parm)
695 {
696    
697    ModuleController *myself = static_cast<ModuleController *>(q);
698    Message *request = op->get_request();
699    Message *response = op->get_response();
700
701    if( request && (! (request->getMask() & message_mask::ha_async)))
702       throw TypeMismatchException();
703
704    if( response && (! (response->getMask() & message_mask::ha_async) ))
705       throw TypeMismatchException();
706    
707    op->release();
708    myself->return_op(op);
709    
710    Uint32 routing;
711    
712    // get rid of the module wrapper 
713    if( request && request->getType() == async_messages::ASYNC_MODULE_OP_START )
714    {
715       (static_cast<AsyncMessage *>(request))->op = NULL;
716       AsyncModuleOperationStart *rq = static_cast<AsyncModuleOperationStart *>(request);
717       request = rq->get_action();
718       request->setRouting(routing = rq->getRouting());
719       delete rq;
720    }
721    
722    // get rid of the module wrapper 
723    if(response && response->getType() == async_messages::ASYNC_MODULE_OP_RESULT )
724    {
725       (static_cast<AsyncMessage *>(response))->op = NULL;
726       AsyncModuleOperationResult *rp = static_cast<AsyncModuleOperationResult *>(response);
727       response = rp->get_result();
728       response->setRouting(routing = rp->getRouting());
729       delete rp;
730    }
731    
732    callback_handle *cb = reinterpret_cast<callback_handle *>(parm);
733    
734    cb->_module->_send_async_callback(routing, response, cb->_parm); 
735    delete cb;
736    
737    return;
738 }
739
740
741 // send an async message to a service asynchronously
742 Boolean ModuleController::ModuleSendAsync(const pegasus_module & handle,
743                                           Uint32 msg_handle, 
744                                           Uint32 destination_q, 
745                                           AsyncRequest *message, 
746                                           void *callback_parm) 
747    throw(Permission, IPCException)
748 {
749    //printf("verifying handle %p, controller at %p \n", &handle, this);
750    
751    if ( false == verify_handle(const_cast<pegasus_module *>(&handle)))
752       throw(Permission(pegasus_thread_self()));
753
754    if (message->op == NULL)
755    {
756       message->op = get_op();
757       message->op->put_request(message);
758    }
759
760
761    callback_handle *cb = new callback_handle(const_cast<pegasus_module *>(&handle), 
762                                              callback_parm);
763    
764    message->setRouting(msg_handle);
765    message->resp = getQueueId();
766    message->block = false;
767    message->dest = destination_q;
768    return SendAsync(message->op, 
769                     destination_q,
770                     _async_handleEnqueue,
771                     this,
772                     cb);
773 }
774
775 // send a message to a module within another service asynchronously 
776 Boolean ModuleController::ModuleSendAsync(const pegasus_module & handle,
777                                           Uint32 msg_handle, 
778                                           Uint32 destination_q, 
779                                           const String & destination_module,
780                                           AsyncRequest *message, 
781                                           void *callback_parm) 
782    throw(Permission, IPCException)
783 {
784    
785    if ( false == verify_handle(const_cast<pegasus_module *>(&handle)))
786       throw(Permission(pegasus_thread_self()));
787
788    AsyncOpNode *op = get_op();
789    AsyncModuleOperationStart *request = 
790       new AsyncModuleOperationStart(msg_handle,
791                                     op,
792                                     destination_q,
793                                     getQueueId(),
794                                     true, 
795                                     destination_module,
796                                     message);
797    request->dest = destination_q;
798    callback_handle *cb = new callback_handle(const_cast<pegasus_module *>(&handle), callback_parm); 
799    return SendAsync(op, 
800                     destination_q,
801                     _async_handleEnqueue,
802                     this,
803                     cb);
804 }
805
806
807 Boolean ModuleController::_send_forget(Uint32 destination_q, AsyncRequest *message)
808    throw(IPCException)
809 {
810    message->dest = destination_q;
811    return SendForget(message);
812 }
813
814 Boolean ModuleController::_send_forget(Uint32 destination_q, 
815                                        const String & destination_module, 
816                                        AsyncRequest *message)
817    throw(IPCException)
818 {
819    AsyncOpNode *op = get_op();
820    message->dest = destination_q;
821    AsyncModuleOperationStart *request = 
822       new AsyncModuleOperationStart(0,
823                                     op,
824                                     destination_q,
825                                     getQueueId(),
826                                     true, 
827                                     destination_module,
828                                     message);
829    return SendForget(request);
830 }
831
832
833
834 Boolean ModuleController::ModuleSendForget(const pegasus_module & handle, 
835                                            Uint32 destination_q, 
836                                            AsyncRequest *message)
837    throw(Permission, IPCException)
838 {
839    if(false == verify_handle(const_cast<pegasus_module *>( &handle)))
840       throw(Permission(pegasus_thread_self()));
841    
842    return _send_forget(destination_q, message);
843 }
844       
845 Boolean ModuleController::ModuleSendForget(const pegasus_module & handle, 
846                                            Uint32 destination_q, 
847                                            const String & destination_module, 
848                                            AsyncRequest *message)
849    throw(Permission, IPCException)
850 {
851    if(false == verify_handle(const_cast<pegasus_module *>( &handle)))
852       throw(Permission(pegasus_thread_self()));
853    return _send_forget(destination_q, 
854                        destination_module, 
855                        message);
856 }
857
858
859
860 void ModuleController::_blocking_thread_exec(
861    PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *thread_func)(void *), 
862    void *parm) 
863 {
864    Semaphore *blocking_sem = new Semaphore(0);
865    _thread_pool->allocate_and_awaken(parm, thread_func, blocking_sem);
866    blocking_sem->wait();
867    delete blocking_sem;
868 }
869
870
871 void ModuleController::blocking_thread_exec(
872    const pegasus_module & handle, 
873    PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *thread_func)(void *), 
874    void *parm) throw(Permission, Deadlock, IPCException)
875 {
876    if ( false == verify_handle(const_cast<pegasus_module *>(&handle)))
877       throw(Permission(pegasus_thread_self()));
878    _blocking_thread_exec(thread_func, parm);
879    
880 }
881
882
883 void ModuleController::_async_thread_exec(
884    PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *thread_func)(void *), 
885    void *parm) 
886 {
887    _thread_pool->allocate_and_awaken(parm, thread_func);
888 }
889
890
891
892 void ModuleController::async_thread_exec(const pegasus_module & handle, 
893                                          PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *thread_func)(void *), 
894                                          void *parm) throw(Permission, Deadlock, IPCException)
895 {
896    if ( false == verify_handle(const_cast<pegasus_module *>(&handle)))
897       throw(Permission(pegasus_thread_self()));
898    _async_thread_exec(thread_func, parm);
899 }
900
901 void ModuleController::_handle_async_request(AsyncRequest *rq)
902 {
903
904    if( rq->getType() == async_messages::ASYNC_MODULE_OP_START)
905    {
906       // find the target module
907       pegasus_module *target;
908       Message *module_result = NULL;
909       {
910          
911          _module_lock lock(&_modules);
912          target = _modules.next(0);
913          while(target != NULL)
914          {
915             if(target->get_name() == static_cast<AsyncModuleOperationStart *>(rq)->_target_module)
916             {
917                
918                module_result = target->_receive_message(static_cast<AsyncModuleOperationStart *>(rq)->_act);
919                break;
920             }
921             
922             target = _modules.next(target);
923          }
924          
925       }
926       
927       if(module_result == NULL)
928       {
929          module_result = new AsyncReply(async_messages::REPLY, 
930                                         static_cast<AsyncModuleOperationStart *>(rq)->_act->getKey(),
931                                         static_cast<AsyncModuleOperationStart *>(rq)->_act->getRouting(),
932                                         message_mask::ha_async | message_mask::ha_reply,
933                                         rq->op,
934                                         async_results::CIM_NAK,
935                                         rq->resp,
936                                         false);
937       }
938       
939       AsyncModuleOperationResult *result = 
940          new AsyncModuleOperationResult(rq->getKey(),
941                                         rq->getRouting(),
942                                         rq->op,
943                                         async_results::OK,
944                                         static_cast<AsyncModuleOperationStart *>(rq)->resp,
945                                         false, 
946                                         static_cast<AsyncModuleOperationStart *>(rq)->_target_module,
947                                         module_result);
948       _complete_op_node(rq->op, 0, 0, 0);
949    }
950    else
951       Base::_handle_async_request(rq);
952 }
953
954 void ModuleController::_handle_async_callback(AsyncOpNode *op)
955 {
956    Base::_handle_async_callback(op);
957    
958 }
959
960 ModuleController & ModuleController::get_client_handle(const pegasus_identity & id,
961                                                        client_handle **handle)
962    throw(IncompatibleTypesException)
963 {
964    return get_client_handle(PEGASUS_QUEUENAME_CONTROLSERVICE,
965                             id, 
966                             handle);
967 }
968
969
970 // called by a non-module client to get the interface and authorization to use the controller
971
972 ModuleController & ModuleController::get_client_handle(const char *controller,
973                                                        const pegasus_identity & id, 
974                                                        client_handle **handle) 
975    throw(IncompatibleTypesException)
976 {
977    
978    if(handle == NULL)
979       throw NullPointer();
980       
981    Array<Uint32> services;
982    MessageQueue *message_queue = MessageQueue::lookup(controller);
983    
984    if ((message_queue == NULL) || ( false == message_queue->isAsync() ))
985    {
986       throw IncompatibleTypesException();
987    }
988
989    MessageQueueService *service = static_cast<MessageQueueService *>(message_queue);
990    if( (service == NULL) ||  ! ( service->get_capabilities() & module_capabilities::module_controller ))
991    {
992       throw IncompatibleTypesException();
993    }
994
995    ModuleController *_controller = static_cast<ModuleController *>(service);
996    if(true == const_cast<pegasus_identity &>(id).authenticate())
997       *handle = new client_handle(id);
998    else
999       *handle = NULL;
1000    
1001    return *_controller;
1002 }
1003
1004
1005 void ModuleController::return_client_handle(client_handle *handle)
1006 {
1007    if( true == handle->reference_count.DecAndTestIfZero())
1008       delete handle;
1009 }
1010
1011
1012 // send a message to another service
1013 AsyncReply *ModuleController::ClientSendWait(const client_handle & handle,
1014                                              Uint32 destination_q, 
1015                                              AsyncRequest *request) 
1016    throw(Permission, IPCException)
1017 {
1018    if( false == const_cast<client_handle &>(handle).authorized(CLIENT_SEND_WAIT)) 
1019       throw Permission(pegasus_thread_self());
1020    return _send_wait(destination_q, request);
1021 }
1022
1023
1024 // send a message to another module via another service
1025 AsyncReply *ModuleController::ClientSendWait(const client_handle & handle,
1026                                              Uint32 destination_q,
1027                                              String & destination_module,
1028                                              AsyncRequest *request) 
1029    throw(Permission, Deadlock, IPCException)
1030 {
1031    if( false == const_cast<client_handle &>(handle).authorized(CLIENT_SEND_WAIT_MODULE)) 
1032       throw Permission(pegasus_thread_self());
1033    return _send_wait(destination_q, destination_module, request);
1034 }
1035
1036
1037 // send an async message to another service
1038 Boolean ModuleController::ClientSendAsync(const client_handle & handle,
1039                                           Uint32 msg_handle, 
1040                                           Uint32 destination_q, 
1041                                           AsyncRequest *message,
1042                                           void (*async_callback)(Uint32, Message *, void *) ,
1043                                           void *callback_parm)
1044    throw(Permission, IPCException)
1045 {
1046    if( false == const_cast<client_handle &>(handle).authorized(CLIENT_SEND_ASYNC)) 
1047       throw Permission(pegasus_thread_self());
1048    
1049    pegasus_module *temp = new pegasus_module(this, 
1050                                              String(PEGASUS_MODULENAME_TEMP),
1051                                              this, 
1052                                              0, 
1053                                              async_callback, 
1054                                              0);
1055    return ModuleSendAsync( *temp, 
1056                            msg_handle, 
1057                            destination_q, 
1058                            message, 
1059                            callback_parm);
1060 }
1061
1062
1063 // send an async message to another module via another service
1064 Boolean ModuleController::ClientSendAsync(const client_handle & handle,
1065                                           Uint32 msg_handle, 
1066                                           Uint32 destination_q, 
1067                                           const String & destination_module,
1068                                           AsyncRequest *message, 
1069                                           void (*async_callback)(Uint32, Message *, void *),
1070                                           void *callback_parm)
1071    throw(Permission, IPCException)
1072 {
1073    if( false == const_cast<client_handle &>(handle).authorized(CLIENT_SEND_ASYNC_MODULE)) 
1074       throw Permission(pegasus_thread_self());
1075    
1076    pegasus_module *temp = new pegasus_module(this, 
1077                                              String(PEGASUS_MODULENAME_TEMP),
1078                                              this, 
1079                                              0, 
1080                                              async_callback, 
1081                                              0);
1082    return ModuleSendAsync(*temp, 
1083                           msg_handle, 
1084                           destination_q, 
1085                           destination_module,
1086                           message,
1087                           callback_parm);
1088 }
1089
1090
1091
1092 Boolean ModuleController::ClientSendForget(const client_handle & handle, 
1093                                            Uint32 destination_q, 
1094                                            AsyncRequest *message)
1095    throw(Permission, IPCException)
1096 {
1097    if( false == const_cast<client_handle &>(handle).authorized(CLIENT_SEND_FORGET)) 
1098       throw Permission(pegasus_thread_self());
1099
1100    return _send_forget(destination_q, message);
1101 }
1102
1103
1104 Boolean ModuleController::ClientSendForget(const client_handle & handle, 
1105                                            Uint32 destination_q, 
1106                                            const String & destination_module, 
1107                                            AsyncRequest *message)
1108    throw(Permission, IPCException)
1109 {
1110    if( false == const_cast<client_handle &>(handle).authorized(CLIENT_SEND_FORGET_MODULE)) 
1111       throw Permission(pegasus_thread_self());
1112
1113    return _send_forget(destination_q, destination_module, message);
1114 }
1115
1116 void ModuleController::client_blocking_thread_exec( 
1117    const client_handle & handle,
1118    PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *thread_func)(void *), 
1119    void *parm) throw(Permission, Deadlock, IPCException)
1120 {
1121    if( false == const_cast<client_handle &>(handle).authorized(CLIENT_BLOCKING_THREAD_EXEC)) 
1122       throw Permission(pegasus_thread_self());
1123    _blocking_thread_exec(thread_func, parm);
1124    
1125 }
1126
1127 void ModuleController::client_async_thread_exec(
1128    const client_handle & handle, 
1129    PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *thread_func)(void *), 
1130    void *parm)    throw(Permission, Deadlock, IPCException)
1131 {
1132
1133    if( false == const_cast<client_handle &>(handle).authorized(CLIENT_ASYNC_THREAD_EXEC)) 
1134       throw Permission(pegasus_thread_self());
1135    _async_thread_exec(thread_func, parm);
1136 }
1137
1138
1139 PEGASUS_NAMESPACE_END