1 //%2003////////////////////////////////////////////////////////////////////////
3 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Development
4 // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
8 // Permission is hereby granted, free of charge, to any person obtaining a copy
9 // of this software and associated documentation files (the "Software"), to
10 // deal in the Software without restriction, including without limitation the
11 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
12 // sell copies of the Software, and to permit persons to whom the Software is
13 // furnished to do so, subject to the following conditions:
15 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
16 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
17 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
18 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
19 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
20 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
21 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 //==============================================================================
26 // Author: Mike Brasher (mbrasher@bmc.com)
30 //%/////////////////////////////////////////////////////////////////////////////
32 #include <Pegasus/Common/HashTable.h>
33 #include <Pegasus/Common/IPC.h>
34 #include <Pegasus/Common/Tracer.h>
35 #include "MessageQueue.h"
36 #include "MessageQueueService.h"
39 PEGASUS_NAMESPACE_BEGIN
41 typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
44 static QueueTable _queueTable(256);
45 static Mutex q_table_mut ;
47 void MessageQueue::remove_myself(Uint32 qid)
49 q_table_mut.lock(pegasus_thread_self());
51 _queueTable.remove(qid);
56 Uint32 MessageQueue::getNextQueueId() throw(IPCException)
58 static Uint32 _nextQueueId = 2;
64 static Mutex _id_mut ;
65 _id_mut.lock(pegasus_thread_self());
69 // Assign the next queue ID that is not already in use
72 // Handle wrap around and never assign zero or one as a queue id:
73 if (_nextQueueId == 0)
78 queueId = _nextQueueId++;
79 } while (lookup(queueId) != 0);
92 MessageQueue::MessageQueue(
96 : _queueId(queueId), _capabilities(0), _count(0), _front(0), _back(0), _async(async)
102 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::MessageQueue()");
107 _name = new char[strlen(name) + 1];
110 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
111 "MessageQueue::MessageQueue name = %s, queueId = %i", name, queueId);
114 // Insert into queue table:
117 q_table_mut.lock(pegasus_thread_self());
119 while (!_queueTable.insert(_queueId, this))
122 q_table_mut.unlock();
128 MessageQueue::~MessageQueue()
130 // ATTN-A: thread safety!
132 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::~MessageQueue()");
134 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
135 "MessageQueue::~MessageQueue queueId = %i, name = %s", _queueId, _name);
137 q_table_mut.lock(pegasus_thread_self());
139 _queueTable.remove(_queueId);
140 q_table_mut.unlock();
149 void MessageQueue::enqueue(Message* message) throw(IPCException)
151 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::enqueue()");
155 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
156 "MessageQueue::enqueue failure");
161 PEG_TRACE_STRING( TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
162 String("Queue name: ") + getQueueName() ) ;
163 Tracer::trace ( TRC_MESSAGEQUEUESERVICE,
166 MessageTypeToString(message->getType()),
169 _mut.lock(pegasus_thread_self());
172 _back->_next = message;
173 message->_prev = _back;
184 message->_owner = this;
187 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
188 "MessageQueue::enqueue _queueId = %d, _count = %d", _queueId, _count);
196 Message* MessageQueue::dequeue() throw(IPCException)
198 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::dequeue()");
200 _mut.lock(pegasus_thread_self());
203 Message* message = _front;
204 _front = _front->_next;
208 if (_back == message)
212 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
213 "MessageQueue::dequeue _queueId = %d, _count = %d",
232 void MessageQueue::remove(Message* message) throw(IPCException)
234 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::remove()");
242 if (message->_owner != this)
245 throw NoSuchMessageOnQueue();
248 _mut.lock(pegasus_thread_self());
251 message->_next->_prev = message->_prev;
253 _back = message->_prev;
256 message->_prev->_next = message->_next;
258 _front = message->_next;
261 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
262 "MessageQueue::remove _count = %d", _count);
273 Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
275 _mut.lock(pegasus_thread_self());
277 for (Message* m = front(); m; m = m->getNext())
279 if (m->getType() == type)
289 Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)
291 _mut.lock(pegasus_thread_self());
293 for (Message* m = front(); m; m = m->getNext())
295 if (m->getKey() == key)
306 void MessageQueue::print(ostream& os) const throw(IPCException)
308 const_cast<MessageQueue *>(this)->_mut.lock(pegasus_thread_self());
310 for (const Message* m = front(); m; m = m->getNext())
312 const_cast<MessageQueue *>(this)->_mut.unlock();
315 Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)
317 _mut.lock(pegasus_thread_self());
319 for (Message* m = front(); m; m = m->getNext())
321 if (m->getType() == type && m->getKey() == key)
332 void MessageQueue::lock() throw(IPCException)
334 _mut.lock(pegasus_thread_self());
337 void MessageQueue::unlock()
342 const char* MessageQueue::getQueueName() const
347 MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
350 MessageQueue* queue = 0;
351 q_table_mut.lock(pegasus_thread_self());
353 if (_queueTable.lookup(queueId, queue))
355 q_table_mut.unlock();
361 q_table_mut.unlock();
363 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
364 "MessageQueue::lookup failure queueId = %i", queueId);
370 MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
375 q_table_mut.lock(pegasus_thread_self());
377 for(QueueTable::Iterator i = _queueTable.start(); i; i++)
379 // ATTN: Need to decide how many characters to compare in queue names
380 if(! strcmp( ((MessageQueue *)i.value())->getQueueName(), name) )
382 q_table_mut.unlock();
383 return( (MessageQueue *)i.value());
387 q_table_mut.unlock();
389 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
390 "MessageQueue::lookup failure - name = %s", name);
396 void MessageQueue::handleEnqueue()
401 PEGASUS_NAMESPACE_END