PEP 55 Update license on source files to current license text and date
[tpot/pegasus/.git] / src / Pegasus / Common / MessageQueueService.h
1 //%2003////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001, 2002  BMC Software, Hewlett-Packard Development
4 // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 //
8 // Permission is hereby granted, free of charge, to any person obtaining a copy
9 // of this software and associated documentation files (the "Software"), to
10 // deal in the Software without restriction, including without limitation the
11 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
12 // sell copies of the Software, and to permit persons to whom the Software is
13 // furnished to do so, subject to the following conditions:
14 // 
15 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
16 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
17 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
18 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
19 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
20 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
21 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 //
24 //==============================================================================
25 //
26 // Author: Mike Day (mdday@us.ibm.com)
27 //
28 // Modified By:
29 //
30 //%/////////////////////////////////////////////////////////////////////////////
31
32 #ifndef Pegasus_MessageQueue_Service_h
33 #define Pegasus_MessageQueue_Service_h
34
35 #include <Pegasus/Common/Config.h>
36 #include <Pegasus/Common/Message.h>
37 #include <Pegasus/Common/InternalException.h>
38 #include <Pegasus/Common/IPC.h>
39 #include <Pegasus/Common/Thread.h>
40 #include <Pegasus/Common/AsyncOpNode.h>
41 #include <Pegasus/Common/Cimom.h>
42 #include <Pegasus/Common/CimomMessage.h>
43 #include <Pegasus/Common/Linkage.h>
44
45 PEGASUS_NAMESPACE_BEGIN
46
47 extern const Uint32 CIMOM_Q_ID;
48
49 class message_module;
50
51 class PEGASUS_COMMON_LINKAGE MessageQueueService : public MessageQueue
52 {
53    public:
54
55       typedef MessageQueue Base;
56       
57       MessageQueueService(const char *name,
58                           Uint32 queueID = MessageQueue::getNextQueueId(),
59                           Uint32 capabilities = 0, 
60                           Uint32 mask = message_mask::type_cimom | 
61                           message_mask::type_service | 
62                           message_mask::ha_request | 
63                           message_mask::ha_reply | 
64                           message_mask::ha_async ) ;
65       
66       virtual ~MessageQueueService(void);
67             
68       virtual Boolean isAsync(void) {  return true;  }
69             
70       virtual void enqueue(Message *) throw(IPCException);
71       
72       AsyncReply *SendWait(AsyncRequest *request);
73       Boolean SendAsync(AsyncOpNode *op, 
74                         Uint32 destination,
75                         void (*callback)(AsyncOpNode *, MessageQueue *, void *),
76                         MessageQueue *callback_q,
77                         void *callback_ptr);
78
79       Boolean SendAsync(Message *msg,
80                         Uint32 destination,
81                         void (*callback)(Message *response, void *handle, void *parameter),
82                         void *handle, 
83                         void *parameter);
84       
85       Uint32 get_pending_callback_count(void);
86       
87       Boolean  SendForget(Message *msg);
88       Boolean ForwardOp(AsyncOpNode *, Uint32 destination);
89       
90
91       Boolean register_service(String name, Uint32 capabilities, Uint32 mask);
92       Boolean update_service(Uint32 capabilities, Uint32 mask);
93       Boolean deregister_service(void);
94       virtual void _shutdown_incoming_queue(void);
95       
96       void find_services(String name,
97                          Uint32 capabilities, 
98                          Uint32 mask, 
99                          Array<Uint32> *results);
100       void enumerate_service(Uint32 queue, message_module *result);
101       Uint32 get_next_xid(void);
102       static AsyncOpNode *get_op(void);
103       void return_op(AsyncOpNode *op);
104
105       Boolean operator ==(const MessageQueueService & svce)
106       {
107          return operator==((const void *)&svce);
108       }
109       Boolean operator ==(const void *svce)
110       {
111          if((void *)this == svce)
112             return true;
113          return false;
114       }
115
116       static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL polling_routine(void *);
117       static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL kill_idle_threads(void *);
118       static int pooled_threads(void) 
119       {
120          return _thread_pool->running_count() + _thread_pool->dead_count() + _thread_pool->pool_count();
121       }
122       static ThreadPool *get_thread_pool(void);
123       static void force_shutdown(Boolean destroy_flag = false);
124       
125       Uint32 _mask;
126       AtomicInt _die;
127    protected:
128       virtual Boolean accept_async(AsyncOpNode *op);
129       virtual Boolean messageOK(const Message *msg) ;
130       virtual void handleEnqueue(void) = 0;
131       virtual void handleEnqueue(Message *) = 0;
132       Boolean _enqueueResponse(Message *, Message *);
133 //      virtual void _handle_incoming_operation(AsyncOpNode *operation, Thread *thread, MessageQueue *queue);
134       virtual void _handle_incoming_operation(AsyncOpNode *);
135       
136       virtual void _handle_async_request(AsyncRequest *req);
137       virtual void _handle_async_callback(AsyncOpNode *operation);     
138       virtual void _make_response(Message *req, Uint32 code);
139       
140
141       virtual void handle_heartbeat_request(AsyncRequest *req);
142       virtual void handle_heartbeat_reply(AsyncReply *rep);
143       
144       virtual void handle_AsyncIoctl(AsyncIoctl *req);
145       virtual void handle_CimServiceStart(CimServiceStart *req);
146       virtual void handle_CimServiceStop(CimServiceStop *req);
147       virtual void handle_CimServicePause(CimServicePause *req);
148       virtual void handle_CimServiceResume(CimServiceResume *req);
149       
150       virtual void handle_AsyncOperationStart(AsyncOperationStart *req);
151       virtual void handle_AsyncOperationResult(AsyncOperationResult *rep);
152       virtual void handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req);
153       virtual void handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep);
154
155       void _completeAsyncResponse(AsyncRequest *request, 
156                                  AsyncReply *reply, 
157                                  Uint32 state, 
158                                  Uint32 flag);
159       void _complete_op_node(AsyncOpNode *, Uint32, Uint32, Uint32);
160       
161
162       static cimom *_meta_dispatcher;
163       static AtomicInt _service_count;
164       static Mutex _meta_dispatcher_mutex;
165       
166    private: 
167       
168       AsyncDQueue<AsyncOpNode> _incoming;
169       DQueue<AsyncOpNode> _callback;
170       static Thread* _polling_thread;
171       static Semaphore _polling_sem;
172       static AtomicInt _stop_polling;
173       static AtomicInt _check_idle_flag;
174       
175       static DQueue<MessageQueueService> _polling_list;
176       
177       static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL _req_proc(void *);
178       static PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL _callback_proc(void *);
179       
180       static void _sendwait_callback(AsyncOpNode *, MessageQueue *, void *);
181       
182       AtomicInt _incoming_queue_shutdown;
183       Semaphore _callback_ready;
184             
185       Thread _req_thread;
186       Thread _callback_thread;
187    protected:
188       static ThreadPool *_thread_pool;
189    private:
190       struct timeval _default_op_timeout;
191       static AtomicInt _xid;
192       friend class cimom;
193       friend class CIMServer;
194       friend class monitor_2;
195       
196 };
197
198 PEGASUS_NAMESPACE_END
199
200 #endif /* Pegasus_MessageQueue_Service_h */
201
202