BUG#: 5840
[tpot/pegasus/.git] / src / Pegasus / Common / ModuleController.cpp
1 //%2006////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
11 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
13 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 // 
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 #include "ModuleController.h"
35 #include <Pegasus/Common/MessageLoader.h>
36 #include <Pegasus/Common/InternalException.h>
37
38 PEGASUS_NAMESPACE_BEGIN
39
40 PEGASUS_USING_STD;
41
42 RegisteredModuleHandle::RegisteredModuleHandle(
43     const String& name,
44     void* module_address,
45     Message* (*receive_message)(Message *, void *),
46     void (*async_callback)(Uint32, Message *, void *))
47     : _name(name),
48       _module_address(module_address),
49       _module_receive_message(receive_message),
50       _async_callback(async_callback)
51 {
52     PEGASUS_ASSERT(_module_receive_message != 0);
53 }
54
55 RegisteredModuleHandle::~RegisteredModuleHandle()
56 {
57 }
58
59 const String & RegisteredModuleHandle::get_name() const
60 {
61     return _name;
62 }
63
64 Message* RegisteredModuleHandle::_receive_message(Message* msg)
65 {
66     return _module_receive_message(msg, _module_address);
67 }
68
69 void RegisteredModuleHandle::_send_async_callback(
70     Uint32 msg_handle,
71     Message* msg,
72     void* parm)
73 {
74     // ATTN: Assert this condition?
75     if (_async_callback == 0)
76     {
77         throw NotImplemented("Module Async Receive");
78     }
79
80     _async_callback(msg_handle, msg, parm);
81 }
82
83
84 ModuleController::ModuleController(const char* name)
85     : Base(name, MessageQueue::getNextQueueId(),
86            module_capabilities::module_controller |
87            module_capabilities::async),
88       _modules()
89 {
90 }
91
92 ModuleController::~ModuleController()
93 {
94     RegisteredModuleHandle* module;
95
96     try
97     {
98         module = _modules.remove_front();
99         while (module)
100         {
101             delete module;
102             module = _modules.remove_front();
103         }
104     }
105     catch (...)
106     {
107     }
108 }
109
110 // called by a module to register itself, returns a handle to the controller
111 ModuleController& ModuleController::register_module(
112     const String& controller_name,
113     const String& module_name,
114     void* module_address,
115     Message* (*receive_message)(Message *, void *),
116     void (*async_callback)(Uint32, Message *, void *),
117     RegisteredModuleHandle** instance)
118 {
119     RegisteredModuleHandle *module;
120     ModuleController *controller;
121
122     Array<Uint32> services;
123
124     MessageQueue *message_queue =
125         MessageQueue::lookup(controller_name.getCString());
126
127     if ((message_queue == NULL) || (!message_queue->isAsync()))
128     {
129         throw IncompatibleTypesException();
130     }
131
132     MessageQueueService *service =
133         static_cast<MessageQueueService *>(message_queue);
134     if ((service == NULL) ||
135         !(service->get_capabilities() & module_capabilities::module_controller))
136     {
137         throw IncompatibleTypesException();
138     }
139
140     controller = static_cast<ModuleController *>(service);
141
142     {
143
144         // see if the module already exists in this controller.
145         _module_lock lock(&(controller->_modules));
146
147         module = controller->_modules.front();
148         while (module != NULL)
149         {
150             if (module->get_name() == module_name)
151             {
152                 MessageLoaderParms parms(
153                     "Common.ModuleController.MODULE",
154                     "module \"$0\"",
155                     module_name);
156                 throw AlreadyExistsException(parms);
157             }
158             module = controller->_modules.next_of(module);
159         }
160     }
161
162     // now reserve this module name with the meta dispatcher
163
164     Uint32 result = 0;
165     AutoPtr<RegisteredModule> request(new RegisteredModule(
166         0,
167         true,
168         controller->getQueueId(),
169         module_name));
170
171     request->dest = CIMOM_Q_ID;
172
173     AutoPtr<AsyncReply> response(controller->SendWait(request.get()));
174     if (response.get() != NULL)
175         result = response->result;
176
177     request.reset();
178     response.reset();
179     if (result == async_results::MODULE_ALREADY_REGISTERED)
180     {
181         MessageLoaderParms parms(
182             "Common.ModuleController.MODULE",
183             "module \"$0\"", module_name);
184         throw AlreadyExistsException(parms);
185     }
186
187     // the module does not exist, go ahead and create it.
188     module = new RegisteredModuleHandle(
189         module_name,
190         module_address,
191         receive_message,
192         async_callback);
193
194     controller->_modules.insert_back(module);
195
196     if (instance != NULL)
197         *instance = module;
198
199     return *controller;
200 }
201
202
203 Boolean ModuleController::deregister_module(const String& module_name)
204 {
205     AutoPtr<DeRegisteredModule> request(new DeRegisteredModule(
206         0,
207         true,
208         getQueueId(),
209         module_name));
210     request->dest = _meta_dispatcher->getQueueId();
211
212     AutoPtr<AsyncReply> response(SendWait(request.get()));
213
214     request.reset();
215     response.reset();
216
217     RegisteredModuleHandle* module;
218
219     _module_lock lock(&_modules);
220     module = _modules.front();
221     while (module != NULL)
222     {
223         if (module->get_name() == module_name)
224         {
225             _modules.remove(module);
226             return true;
227         }
228         module = _modules.next_of(module);
229     }
230     return false;
231 }
232
233 Boolean ModuleController::verify_handle(RegisteredModuleHandle* handle)
234 {
235     RegisteredModuleHandle *module;
236
237     if (handle->_module_address == (void *)this)
238         return true;
239
240     _module_lock lock(&_modules);
241
242     module = _modules.front();
243     while (module != NULL)
244     {
245         if ( module == handle)
246         {
247             return true;
248         }
249         module = _modules.next_of(module);
250     }
251     return false;
252 }
253
254 // given a name, find a service's queue id
255 Uint32 ModuleController::find_service(
256     const RegisteredModuleHandle& handle,
257     const String& name)
258 {
259     if (false == verify_handle(const_cast<RegisteredModuleHandle *>(&handle)))
260         throw Permission(Threads::self());
261     Array<Uint32> services;
262     Base::find_services(name, 0, 0, &services);
263     return services[0];
264 }
265
266
267 // returns the queue ID of the service hosting the named module,
268 // zero otherwise
269
270 Uint32 ModuleController::find_module_in_service(
271     const RegisteredModuleHandle& handle,
272     const String& name)
273 {
274     if (false == verify_handle(const_cast<RegisteredModuleHandle *>(&handle)))
275         throw(Permission(Threads::self()));
276
277     Uint32 result = 0;
278
279     AutoPtr<FindModuleInService> request(new FindModuleInService(
280         0,
281         true,
282         _meta_dispatcher->getQueueId(),
283         name));
284     request->dest = _meta_dispatcher->getQueueId();
285     AutoPtr<FindModuleInServiceResponse> response(
286         static_cast<FindModuleInServiceResponse *>(SendWait(request.get())));
287     if (response.get() != NULL)
288         result = response->_module_service_queue;
289
290     return result;
291 }
292
293
294 AsyncReply* ModuleController::_send_wait(
295     Uint32 destination_q,
296     AsyncRequest* request)
297 {
298     request->dest = destination_q;
299     AsyncReply* reply = Base::SendWait(request);
300     return reply;
301 }
302
303
304 // sendwait to another service
305 AsyncReply* ModuleController::ModuleSendWait(
306     const RegisteredModuleHandle& handle,
307     Uint32 destination_q,
308     AsyncRequest* request)
309 {
310     if (false == verify_handle(const_cast<RegisteredModuleHandle *>(&handle)))
311         throw(Permission(Threads::self()));
312
313     return _send_wait(destination_q, request);
314 }
315
316 AsyncReply* ModuleController::_send_wait(
317     Uint32 destination_q,
318     const String& destination_module,
319     AsyncRequest* message)
320 {
321     AutoPtr<AsyncModuleOperationStart> request(new AsyncModuleOperationStart(
322         0,
323         destination_q,
324         getQueueId(),
325         true,
326         destination_module,
327         message));
328
329     request->dest = destination_q;
330     AutoPtr<AsyncModuleOperationResult> response(
331         static_cast<AsyncModuleOperationResult *>(SendWait(request.get())));
332
333     AsyncReply *ret = 0;
334
335     if (response.get() != NULL &&
336         response->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
337     {
338         // clear the request out of the envelope so it can be deleted by the
339         // module
340         ret = static_cast<AsyncReply *>(response->get_result());
341     }
342     request->get_action();
343     return ret;
344 }
345
346
347 // sendwait to another module controlled by another service.
348 // throws Deadlock() if destination_q is this->queue_id
349 AsyncReply* ModuleController::ModuleSendWait(
350     const RegisteredModuleHandle& handle,
351     Uint32 destination_q,
352     const String& destination_module,
353     AsyncRequest* message)
354 {
355     if (false == verify_handle(const_cast<RegisteredModuleHandle *>(&handle)))
356         throw(Permission(Threads::self()));
357
358     return _send_wait(destination_q, destination_module, message);
359 }
360
361 void ModuleController::_async_handleEnqueue(
362     AsyncOpNode* op,
363     MessageQueue* q,
364     void* parm)
365 {
366     ModuleController* myself = static_cast<ModuleController *>(q);
367     Message* request = op->removeRequest();
368     Message* response = op->removeResponse();
369
370     if (request && (!(request->getMask() & MessageMask::ha_async)))
371         throw TypeMismatchException();
372
373     if (response && (!(response->getMask() & MessageMask::ha_async)))
374         throw TypeMismatchException();
375
376     op->release();
377     myself->return_op(op);
378
379     // get rid of the module wrapper
380     if (request && request->getType() == async_messages::ASYNC_MODULE_OP_START)
381     {
382         (static_cast<AsyncMessage *>(request))->op = NULL;
383         AsyncModuleOperationStart *rq =
384             static_cast<AsyncModuleOperationStart *>(request);
385         request = rq->get_action();
386         delete rq;
387     }
388
389     // get rid of the module wrapper
390     if (response &&
391         response->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
392     {
393         (static_cast<AsyncMessage *>(response))->op = NULL;
394         AsyncModuleOperationResult *rp =
395             static_cast<AsyncModuleOperationResult *>(response);
396         response = rp->get_result();
397         delete rp;
398     }
399
400     callback_handle *cb = reinterpret_cast<callback_handle *>(parm);
401
402     cb->_module->_send_async_callback(0, response, cb->_parm);
403     delete cb;
404 }
405
406
407 // send an async message to a service asynchronously
408 Boolean ModuleController::ModuleSendAsync(
409     const RegisteredModuleHandle& handle,
410     Uint32 msg_handle,
411     Uint32 destination_q,
412     AsyncRequest* message,
413     void* callback_parm)
414 {
415     if (false == verify_handle(const_cast<RegisteredModuleHandle *>(&handle)))
416         throw(Permission(Threads::self()));
417
418     if (message->op == NULL)
419     {
420         message->op = get_op();
421         message->op->setRequest(message);
422     }
423
424
425     callback_handle *cb = new callback_handle(
426         const_cast<RegisteredModuleHandle *>(&handle),
427         callback_parm);
428
429     message->resp = getQueueId();
430     message->block = false;
431     message->dest = destination_q;
432     return SendAsync(
433         message->op,
434         destination_q,
435         _async_handleEnqueue,
436         this,
437         cb);
438 }
439
440 // send a message to a module within another service asynchronously
441 Boolean ModuleController::ModuleSendAsync(
442     const RegisteredModuleHandle& handle,
443     Uint32 msg_handle,
444     Uint32 destination_q,
445     const String& destination_module,
446     AsyncRequest* message,
447     void* callback_parm)
448 {
449     if (false == verify_handle(const_cast<RegisteredModuleHandle *>(&handle)))
450         throw(Permission(Threads::self()));
451
452     AsyncOpNode *op = get_op();
453     AsyncModuleOperationStart *request = new AsyncModuleOperationStart(
454         op,
455         destination_q,
456         getQueueId(),
457         true,
458         destination_module,
459         message);
460     request->dest = destination_q;
461     callback_handle* cb = new callback_handle(
462         const_cast<RegisteredModuleHandle *>(&handle),
463         callback_parm);
464     return SendAsync(
465         op,
466         destination_q,
467         _async_handleEnqueue,
468         this,
469         cb);
470 }
471
472
473 Boolean ModuleController::_send_forget(
474     Uint32 destination_q,
475     AsyncRequest *message)
476 {
477     message->dest = destination_q;
478     return SendForget(message);
479 }
480
481 Boolean ModuleController::_send_forget(
482     Uint32 destination_q,
483     const String& destination_module,
484     AsyncRequest* message)
485 {
486     AsyncOpNode* op = get_op();
487     message->dest = destination_q;
488     AsyncModuleOperationStart* request = new AsyncModuleOperationStart(
489         op,
490         destination_q,
491         getQueueId(),
492         true,
493         destination_module,
494         message);
495     return SendForget(request);
496 }
497
498
499 Boolean ModuleController::ModuleSendForget(
500     const RegisteredModuleHandle& handle,
501     Uint32 destination_q,
502     AsyncRequest* message)
503 {
504     if (false == verify_handle(const_cast<RegisteredModuleHandle *>(&handle)))
505         throw(Permission(Threads::self()));
506
507     return _send_forget(destination_q, message);
508 }
509
510 Boolean ModuleController::ModuleSendForget(
511     const RegisteredModuleHandle& handle,
512     Uint32 destination_q,
513     const String& destination_module,
514     AsyncRequest* message)
515 {
516     if (false == verify_handle(const_cast<RegisteredModuleHandle *>(&handle)))
517         throw(Permission(Threads::self()));
518     return _send_forget(destination_q, destination_module, message);
519 }
520
521
522 void ModuleController::_handle_async_request(AsyncRequest* rq)
523 {
524     if (rq->getType() == async_messages::ASYNC_MODULE_OP_START)
525     {
526         // find the target module
527         RegisteredModuleHandle* target;
528         Message* module_result = NULL;
529
530         {
531             _module_lock lock(&_modules);
532             target = _modules.front();
533             while (target != NULL)
534             {
535                 if (target->get_name() ==
536                         static_cast<AsyncModuleOperationStart *>(rq)->
537                             _target_module)
538                 {
539                     break;
540                 }
541
542                 target = _modules.next_of(target);
543             }
544         }
545
546         if (target)
547         {
548             // ATTN: This statement was taken out of the _module_lock block
549             // above because that caused all requests to control providers to
550             // be serialized.  There is now a risk that the control provider
551             // module may be deleted after the lookup and before this call.
552             // See Bugzilla 3120.
553             module_result = target->_receive_message(
554                 static_cast<AsyncModuleOperationStart *>(rq)->_act);
555         }
556
557         if (module_result == NULL)
558         {
559             module_result = new AsyncReply(
560                 async_messages::REPLY,
561                 MessageMask::ha_async | MessageMask::ha_reply,
562                 rq->op,
563                 async_results::CIM_NAK,
564                 rq->resp,
565                 false);
566         }
567
568         AsyncModuleOperationResult *result = new AsyncModuleOperationResult(
569             rq->op,
570             async_results::OK,
571             static_cast<AsyncModuleOperationStart *>(rq)->resp,
572             false,
573             static_cast<AsyncModuleOperationStart *>(rq)->_target_module,
574             module_result);
575         _complete_op_node(rq->op, 0, 0, 0);
576     }
577     else
578         Base::_handle_async_request(rq);
579 }
580
581 void ModuleController::_handle_async_callback(AsyncOpNode* op)
582 {
583     Base::_handle_async_callback(op);
584 }
585
586 ModuleController* ModuleController::getModuleController()
587 {
588     MessageQueue* messageQueue =
589         MessageQueue::lookup(PEGASUS_QUEUENAME_CONTROLSERVICE);
590     PEGASUS_ASSERT(messageQueue != 0);
591     PEGASUS_ASSERT(messageQueue->isAsync());
592
593     MessageQueueService* service =
594         dynamic_cast<MessageQueueService*>(messageQueue);
595     PEGASUS_ASSERT(service != 0);
596     PEGASUS_ASSERT(
597         service->get_capabilities() & module_capabilities::module_controller);
598
599     return static_cast<ModuleController*>(service);
600 }
601
602
603 // send a message to another service
604 AsyncReply* ModuleController::ClientSendWait(
605     Uint32 destination_q,
606     AsyncRequest* request)
607 {
608     return _send_wait(destination_q, request);
609 }
610
611
612 // send a message to another module via another service
613 AsyncReply *ModuleController::ClientSendWait(
614     Uint32 destination_q,
615     String& destination_module,
616     AsyncRequest* request)
617 {
618     return _send_wait(destination_q, destination_module, request);
619 }
620
621
622 // send an async message to another service
623 Boolean ModuleController::ClientSendAsync(
624     Uint32 msg_handle,
625     Uint32 destination_q,
626     AsyncRequest* message,
627     void (*async_callback)(Uint32, Message *, void *),
628     void* callback_parm)
629 {
630     RegisteredModuleHandle* temp = new RegisteredModuleHandle(
631         String(PEGASUS_MODULENAME_TEMP),
632         this,
633         0,
634         async_callback);
635    return ModuleSendAsync(
636         *temp,
637         msg_handle,
638         destination_q,
639         message,
640         callback_parm);
641 }
642
643
644 // send an async message to another module via another service
645 Boolean ModuleController::ClientSendAsync(
646     Uint32 msg_handle,
647     Uint32 destination_q,
648     const String& destination_module,
649     AsyncRequest* message,
650     void (*async_callback)(Uint32, Message *, void *),
651     void* callback_parm)
652 {
653     RegisteredModuleHandle* temp = new RegisteredModuleHandle(
654         String(PEGASUS_MODULENAME_TEMP),
655         this,
656         0,
657         async_callback);
658    return ModuleSendAsync(
659         *temp,
660         msg_handle,
661         destination_q,
662         destination_module,
663         message,
664         callback_parm);
665 }
666
667
668 Boolean ModuleController::ClientSendForget(
669     Uint32 destination_q,
670     AsyncRequest* message)
671 {
672     return _send_forget(destination_q, message);
673 }
674
675
676 Boolean ModuleController::ClientSendForget(
677     Uint32 destination_q,
678     const String& destination_module,
679     AsyncRequest* message)
680 {
681     return _send_forget(destination_q, destination_module, message);
682 }
683
684 PEGASUS_NAMESPACE_END