void MessageQueueService::_shutdown_incoming_queue(void)
{
- _incoming_queue_shutdown = 1;
- _incoming.shutdown_queue();
- _req_thread.cancel();
+ AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
+ 0,
+ _queueId,
+ _queueId,
+ true,
+ AsyncIoctl::IO_CLOSE,
+ 0,
+ 0);
+ msg->op = get_op();
+ msg->op->_request.insert_first(msg);
+ Boolean closed = false;
+
+ if (_incoming_queue_shutdown.value() > 0 )
+ return ;
+
+ _incoming.insert_last_wait(msg->op);
+ msg->op->_client_sem.wait();
+
+ msg->op->lock();
+ AsyncReply * reply = static_cast<AsyncReply *>(msg->op->_response.remove_first());
+ reply->op = 0;
+ msg->op->unlock();
+ if ( reply != 0 )
+ {
+ if(reply->getMask() & message_mask:: ha_async)
+ {
+ if(reply->getMask() & message_mask::ha_reply)
+ {
+ if(reply->result == async_results::OK)
+ closed = true;
+ }
+ }
+ delete reply;
+ }
+
+ msg->op->_request.remove(msg);
+ msg->op->_state |= ASYNC_OPSTATE_RELEASED;
+
+ return_op(msg->op);
+ msg->op = 0;
+ delete msg;
}
{
try
{
- operation = service->_incoming.remove_first();
+ operation = service->_incoming.remove_first_wait();
}
catch(ListClosed & )
{
break;
}
- if ( service->_incoming.is_shutdown() || service->_die.value() )
- break;
if( operation )
- service->_handle_incoming_operation(operation);
- else
- pegasus_yield();
+ {
+
+ service->_handle_incoming_operation(operation, myself, service);
+ }
+
}
myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
}
-void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
+void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation,
+ Thread *thread,
+ MessageQueue *queue)
{
if ( operation != 0 )
{
PEGASUS_ASSERT(rq != 0 );
PEGASUS_ASSERT(rq->getMask() & message_mask::ha_async );
PEGASUS_ASSERT(rq->getMask() & message_mask::ha_request);
+ static_cast<AsyncMessage *>(rq)->_myself = thread;
+ static_cast<AsyncMessage *>(rq)->_service = queue;
_handle_async_request(static_cast<AsyncRequest *>(rq));
}
Boolean MessageQueueService::accept_async(AsyncOpNode *op)
{
+ if (_incoming_queue_shutdown.value() > 0 )
+ return false;
+
op->lock();
Message *rq = op->_request.next(0);
op->unlock();
Boolean MessageQueueService::messageOK(const Message *msg)
{
+ if (_incoming_queue_shutdown.value() > 0 )
+ return false;
+
if ( msg != 0 )
{
Uint32 mask = msg->getMask();
void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
{
- _make_response(req, async_results::OK);
+
+ switch( req->ctl )
+ {
+ case AsyncIoctl::IO_CLOSE:
+ {
+ // save my bearings
+ Thread *myself = req->_myself;
+ MessageQueueService *service = static_cast<MessageQueueService *>(req->_service);
+
+ // respond to this message.
+ _make_response(req, async_results::OK);
+ // ensure we do not accept any further messages
+
+ // ensure we don't recurse on IO_CLOSE
+ if( _incoming_queue_shutdown.value() > 0 )
+ break;
+
+ // set the closing flag
+ service->_incoming_queue_shutdown = 1;
+ // empty out the queue
+ while( 1 )
+ {
+ AsyncOpNode *operation;
+ try
+ {
+ operation = service->_incoming.remove_first();
+ }
+ catch(IPCException & )
+ {
+ break;
+ }
+ if( operation )
+ {
+ service->_handle_incoming_operation(operation, myself, service);
+ }
+ else
+ break;
+ } // message processing loop
+
+ // shutdown the AsyncDQueue
+ service->_incoming.shutdown_queue();
+ // exit the thread !
+ myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
+ return;
+ }
+
+ default:
+ _make_response(req, async_results::CIM_NAK);
+ }
}
+
void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
{
_make_response(req, async_results::CIM_NAK);