PEP 55 Update license on source files to current license text and date
[tpot/pegasus/.git] / src / Pegasus / Common / MessageQueue.cpp
1 //%2003////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001, 2002  BMC Software, Hewlett-Packard Development
4 // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 //
8 // Permission is hereby granted, free of charge, to any person obtaining a copy
9 // of this software and associated documentation files (the "Software"), to
10 // deal in the Software without restriction, including without limitation the
11 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
12 // sell copies of the Software, and to permit persons to whom the Software is
13 // furnished to do so, subject to the following conditions:
14 // 
15 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
16 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
17 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
18 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
19 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
20 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
21 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 //
24 //==============================================================================
25 //
26 // Author: Mike Brasher (mbrasher@bmc.com)
27 //
28 // Modified By:
29 //
30 //%/////////////////////////////////////////////////////////////////////////////
31
32 #include <Pegasus/Common/HashTable.h>
33 #include <Pegasus/Common/IPC.h>
34 #include <Pegasus/Common/Tracer.h>
35 #include "MessageQueue.h"
36 #include "MessageQueueService.h"
37 PEGASUS_USING_STD;
38
39 PEGASUS_NAMESPACE_BEGIN
40
41 typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
42     QueueTable;
43
44 static QueueTable _queueTable(256);
45 static Mutex q_table_mut ;
46
47 void MessageQueue::remove_myself(Uint32 qid)
48 {
49    q_table_mut.lock(pegasus_thread_self());
50    
51    _queueTable.remove(qid);
52    q_table_mut.unlock();
53 }
54
55
56 Uint32 MessageQueue::getNextQueueId() throw(IPCException)
57 {
58    static Uint32 _nextQueueId = 2;
59
60    //
61    // Lock mutex:
62    //
63
64    static Mutex _id_mut ;
65    _id_mut.lock(pegasus_thread_self());
66
67    Uint32 queueId;
68
69    // Assign the next queue ID that is not already in use
70    do
71    {
72       // Handle wrap around and never assign zero or one as a queue id:
73       if (_nextQueueId == 0)
74       {
75          _nextQueueId = 2;
76       }
77
78       queueId = _nextQueueId++;
79    } while (lookup(queueId) != 0);
80
81    //
82    // Unlock mutex:
83    //
84
85    _id_mut.unlock();
86
87    return queueId;
88 }
89
90
91
92 MessageQueue::MessageQueue(
93     const char* name, 
94     Boolean async,
95     Uint32 queueId)
96    : _queueId(queueId), _capabilities(0), _count(0), _front(0), _back(0), _async(async)
97 {
98     //
99     // Copy the name:
100     //
101
102    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::MessageQueue()");
103
104     if (!name)
105         name = ""; 
106
107     _name = new char[strlen(name) + 1];
108     strcpy(_name, name);
109
110     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
111         "MessageQueue::MessageQueue  name = %s, queueId = %i", name, queueId);
112
113     //
114     // Insert into queue table:
115     //
116
117     q_table_mut.lock(pegasus_thread_self());
118
119     while (!_queueTable.insert(_queueId, this))
120        ;
121
122     q_table_mut.unlock();
123
124     
125    PEG_METHOD_EXIT();
126 }
127
128 MessageQueue::~MessageQueue()
129 {
130     // ATTN-A: thread safety!
131
132     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::~MessageQueue()");
133
134     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
135         "MessageQueue::~MessageQueue queueId = %i, name = %s", _queueId, _name);
136
137     q_table_mut.lock(pegasus_thread_self());
138
139     _queueTable.remove(_queueId);
140     q_table_mut.unlock();
141         
142     // Free the name:
143     
144     delete [] _name;
145
146     PEG_METHOD_EXIT();
147 }
148
149 void MessageQueue::enqueue(Message* message) throw(IPCException)
150 {
151     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::enqueue()");
152
153     if (!message) 
154     {
155        Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
156         "MessageQueue::enqueue failure");
157        PEG_METHOD_EXIT();
158        throw NullPointer();
159     }
160
161     PEG_TRACE_STRING( TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3, 
162                       String("Queue name: ") + getQueueName() ) ;
163     Tracer::trace   ( TRC_MESSAGEQUEUESERVICE, 
164                       Tracer::LEVEL3,
165                       "Message: [%s, %d]", 
166                       MessageTypeToString(message->getType()), 
167                       message->getKey() );
168     
169     _mut.lock(pegasus_thread_self());
170     if (_back)
171     {
172        _back->_next = message;
173        message->_prev = _back;
174        message->_next = 0;
175        _back = message;
176     }
177     else
178     {
179        _front = message;
180        _back = message;
181        message->_prev = 0;
182        message->_next = 0;
183     }
184     message->_owner = this;
185        
186     _count++;
187     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
188                   "MessageQueue::enqueue _queueId = %d, _count = %d", _queueId, _count);
189        
190     _mut.unlock();
191     
192     handleEnqueue();
193     PEG_METHOD_EXIT();
194 }
195
196 Message* MessageQueue::dequeue() throw(IPCException)
197 {
198     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::dequeue()");
199
200    _mut.lock(pegasus_thread_self());
201     if (_front)
202     {
203         Message* message = _front;
204         _front = _front->_next;
205         if (_front)
206             _front->_prev = 0;
207
208         if (_back == message)
209             _back = 0;
210
211         _count--;
212         Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
213             "MessageQueue::dequeue _queueId = %d, _count = %d", 
214             _queueId, _count);
215
216         _mut.unlock();
217         message->_next = 0;
218         message->_prev = 0;
219         message->_owner = 0;
220
221         PEG_METHOD_EXIT();
222         return message;
223     }
224     _mut.unlock();
225
226     PEG_METHOD_EXIT();
227     return 0;
228 }
229 ;
230
231
232 void MessageQueue::remove(Message* message) throw(IPCException)
233 {
234     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::remove()");
235
236     if (!message)
237     {
238         PEG_METHOD_EXIT();
239         throw NullPointer();
240     }
241
242     if (message->_owner != this)
243     {
244         PEG_METHOD_EXIT();
245         throw NoSuchMessageOnQueue();
246     }
247
248     _mut.lock(pegasus_thread_self());
249
250     if (message->_next)
251         message->_next->_prev = message->_prev;
252     else
253         _back = message->_prev;
254
255     if (message->_prev)
256         message->_prev->_next = message->_next;
257     else
258         _front = message->_next;
259
260     _count--;
261     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
262        "MessageQueue::remove _count = %d", _count);
263
264     _mut.unlock();
265
266     message->_prev = 0;
267     message->_next = 0;
268     message->_owner = 0;
269
270     PEG_METHOD_EXIT();
271 }
272
273 Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
274 {
275    _mut.lock(pegasus_thread_self());
276
277     for (Message* m = front(); m; m = m->getNext())
278     {
279        if (m->getType() == type)
280        {
281           _mut.unlock();
282           return m;
283        }
284     }
285     _mut.unlock();
286     return 0;
287 }
288
289 Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)
290 {
291    _mut.lock(pegasus_thread_self());
292
293     for (Message* m = front(); m; m = m->getNext())
294     {
295        if (m->getKey() == key)
296        {
297           _mut.unlock();
298           return m;
299        }
300
301     }
302     _mut.unlock();
303     return 0;
304 }
305
306 void MessageQueue::print(ostream& os) const throw(IPCException)
307 {
308    const_cast<MessageQueue *>(this)->_mut.lock(pegasus_thread_self());
309
310    for (const Message* m = front(); m; m = m->getNext())
311         m->print(os);
312    const_cast<MessageQueue *>(this)->_mut.unlock();
313 }
314
315 Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)
316 {
317    _mut.lock(pegasus_thread_self());
318
319     for (Message* m = front(); m; m = m->getNext())
320     {
321        if (m->getType() == type && m->getKey() == key)
322        {
323           _mut.unlock();
324           return m;
325        }
326     }
327     _mut.unlock();
328
329     return 0;
330 }
331
332 void MessageQueue::lock() throw(IPCException)
333 {
334    _mut.lock(pegasus_thread_self());
335 }
336
337 void MessageQueue::unlock()
338 {
339    _mut.unlock();
340 }
341
342 const char* MessageQueue::getQueueName() const
343 {
344    return _name;
345 }
346
347 MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
348 {
349
350     MessageQueue* queue = 0;
351     q_table_mut.lock(pegasus_thread_self());
352
353     if (_queueTable.lookup(queueId, queue))
354     {
355        q_table_mut.unlock();
356        return queue;
357     }
358
359     // Not found!
360
361     q_table_mut.unlock();
362
363     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
364         "MessageQueue::lookup failure queueId = %i", queueId);
365
366     return 0;
367 }
368
369
370 MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
371 {
372
373    if(name == NULL)
374       throw NullPointer();
375    q_table_mut.lock(pegasus_thread_self());
376
377    for(QueueTable::Iterator i = _queueTable.start(); i; i++)
378    {
379         // ATTN: Need to decide how many characters to compare in queue names
380       if(! strcmp( ((MessageQueue *)i.value())->getQueueName(), name) )
381       {
382          q_table_mut.unlock();
383          return( (MessageQueue *)i.value());
384       }
385
386    }
387    q_table_mut.unlock();
388
389    Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
390         "MessageQueue::lookup failure - name = %s", name);
391
392    return 0;
393 }
394
395
396 void MessageQueue::handleEnqueue()
397 {
398
399 }
400
401 PEGASUS_NAMESPACE_END