*** empty log message ***
authormday <mday>
Mon, 4 Feb 2002 02:51:17 +0000 (02:51 +0000)
committermday <mday>
Mon, 4 Feb 2002 02:51:17 +0000 (02:51 +0000)
src/Pegasus/Common/CimomMessage.cpp
src/Pegasus/Common/CimomMessage.h
src/Pegasus/Common/DQueue.h
src/Pegasus/Common/MessageQueueService.cpp
src/Pegasus/Common/MessageQueueService.h
src/Pegasus/Common/tests/MessageQueueService/MessageQueueService.cpp

index ea3c3c7f821a39637fe06165ccc7f77ea9f92fae..27df146f49687ca3984220decb1f291833bd0854 100644 (file)
@@ -88,7 +88,9 @@ AsyncMessage::AsyncMessage(Uint32 type,
                           Uint32 mask,
                           AsyncOpNode *operation)
    : Message(type, key, routing, mask | message_mask::ha_async),
-     op(operation) 
+     op(operation),
+     _myself(0), 
+     _service(0)
 {  
    
 }
@@ -198,7 +200,7 @@ AsyncIoctl::AsyncIoctl(Uint32 routing,
                   destination, response, blocking),
      ctl(code), 
      intp(int_param),
-     voidp(p_param) 
+     voidp(p_param)
 {  
         
 }
index 23a561c7b052e197f11445ad55730096f5099bf9..e37bd40b9218692439eb925803e7e2f489ccb286 100644 (file)
@@ -119,6 +119,8 @@ class PEGASUS_COMMON_LINKAGE AsyncMessage : public Message
       Boolean operator ==(const AsyncMessage& msg);
       
       AsyncOpNode *op;
+      Thread *_myself;
+      MessageQueue *_service;
 };
 
 
@@ -262,9 +264,20 @@ class PEGASUS_COMMON_LINKAGE AsyncIoctl : public AsyncRequest
 
       }
       
+      enum 
+      {
+        IO_CLOSE,
+        IO_OPEN,
+        IO_SOURCE_QUENCH,
+        IO_SERVICE_DEFINED
+      };
+      
+      
+
       Uint32 ctl;
       Uint32 intp;
       void *voidp;
+
 };
 
 class PEGASUS_COMMON_LINKAGE CimServiceStart : public AsyncRequest
index 53032b381e8d3d2327b409c26cb4b70e05731728..800e01321e207635e57dd57d981f9c161349b956 100644 (file)
@@ -246,11 +246,6 @@ template<class L> class PEGASUS_COMMON_LINKAGE AsyncDQueue: virtual public inter
         _slot->lock_object(pegasus_thread_self());
         while(true == is_full())
         {
-           if(_disallow->value() > 0)
-           {
-              unlock();
-              throw ListClosed();
-           }       
            _slot->unlocked_wait(pegasus_thread_self());
            if(_disallow->value() > 0)
            {
@@ -277,11 +272,7 @@ template<class L> class PEGASUS_COMMON_LINKAGE AsyncDQueue: virtual public inter
         _node->lock_object(pegasus_thread_self());
         while(true == is_empty())
         {
-           if(_disallow->value() > 0)
-           {
-              unlock();
-              throw ListClosed();
-           }
+
            _node->unlocked_wait(pegasus_thread_self());
            if(_disallow->value() > 0)
            {
index 8141cc90ce7073bc2fde88a26cfa70923bba2a81..18a78f13c01fd11e2f9b4e1c671b14895f3a219a 100644 (file)
@@ -67,10 +67,48 @@ AtomicInt MessageQueueService::_xid(1);
 
 void MessageQueueService::_shutdown_incoming_queue(void)
 {
-   _incoming_queue_shutdown = 1;
    
-   _incoming.shutdown_queue();
-   _req_thread.cancel();
+   AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
+                                   0, 
+                                   _queueId, 
+                                   _queueId, 
+                                   true, 
+                                   AsyncIoctl::IO_CLOSE, 
+                                   0, 
+                                   0);
+   msg->op = get_op();
+   msg->op->_request.insert_first(msg);
+   Boolean closed = false;
+
+   if (_incoming_queue_shutdown.value() > 0 )
+      return ;
+   
+   _incoming.insert_last_wait(msg->op);
+   msg->op->_client_sem.wait();
+   
+   msg->op->lock();
+   AsyncReply * reply = static_cast<AsyncReply *>(msg->op->_response.remove_first());
+   reply->op = 0;
+   msg->op->unlock();
+   if ( reply != 0 )
+   {
+      if(reply->getMask() & message_mask:: ha_async)
+      {
+        if(reply->getMask() & message_mask::ha_reply)
+        {
+           if(reply->result == async_results::OK)
+              closed = true;
+        }
+      }
+      delete reply; 
+   }
+      
+   msg->op->_request.remove(msg);
+   msg->op->_state |= ASYNC_OPSTATE_RELEASED;
+   
+   return_op(msg->op);
+   msg->op = 0;
+   delete msg;
 }
 
 
@@ -87,18 +125,18 @@ PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void *
    {
       try 
       {
-        operation = service->_incoming.remove_first();
+        operation = service->_incoming.remove_first_wait();
       }
       catch(ListClosed & )
       {
         break;
       }
-      if ( service->_incoming.is_shutdown() || service->_die.value() )
-        break;
       if( operation )
-        service->_handle_incoming_operation(operation);
-      else
-        pegasus_yield();
+      {
+        
+        service->_handle_incoming_operation(operation, myself, service);
+      }
+      
    }
    
    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
@@ -106,7 +144,9 @@ PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void *
 }
 
 
-void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
+void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation, 
+                                                    Thread *thread, 
+                                                    MessageQueue *queue)
 {
    if ( operation != 0 )
    {
@@ -117,6 +157,8 @@ void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
       PEGASUS_ASSERT(rq != 0 );
       PEGASUS_ASSERT(rq->getMask() & message_mask::ha_async );
       PEGASUS_ASSERT(rq->getMask() & message_mask::ha_request);
+      static_cast<AsyncMessage *>(rq)->_myself = thread;
+      static_cast<AsyncMessage *>(rq)->_service = queue;
       _handle_async_request(static_cast<AsyncRequest *>(rq));
    }
    
@@ -193,6 +235,9 @@ void MessageQueueService::_completeAsyncResponse(AsyncRequest *request,
 
 Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 {
+   if (_incoming_queue_shutdown.value() > 0 )
+      return false;
+   
    op->lock();
    Message *rq = op->_request.next(0);
    op->unlock();
@@ -207,6 +252,9 @@ Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 
 Boolean MessageQueueService::messageOK(const Message *msg)
 {
+   if (_incoming_queue_shutdown.value() > 0 )
+      return false;
+   
    if ( msg != 0 )
    {
       Uint32 mask = msg->getMask();
@@ -251,8 +299,57 @@ void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
       
 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
 {
-   _make_response(req, async_results::OK);
+   
+   switch( req->ctl )
+   {
+      case AsyncIoctl::IO_CLOSE:
+      {
+        // save my bearings 
+        Thread *myself = req->_myself;
+        MessageQueueService *service = static_cast<MessageQueueService *>(req->_service);
+        
+        // respond to this message.
+        _make_response(req, async_results::OK);
+        // ensure we do not accept any further messages
+
+        // ensure we don't recurse on IO_CLOSE
+        if( _incoming_queue_shutdown.value() > 0 )
+           break;
+        
+        // set the closing flag 
+        service->_incoming_queue_shutdown = 1;
+        // empty out the queue
+        while( 1 )
+        {
+           AsyncOpNode *operation;
+           try 
+           {
+              operation = service->_incoming.remove_first();
+           }
+           catch(IPCException & )
+           {
+              break;
+           }
+           if( operation )
+           {
+              service->_handle_incoming_operation(operation, myself, service);
+           }
+           else
+              break;
+        } // message processing loop
+
+        // shutdown the AsyncDQueue
+        service->_incoming.shutdown_queue();
+        // exit the thread ! 
+        myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
+        return;
+      }
+
+      default:
+        _make_response(req, async_results::CIM_NAK);
+   }
 }
+
 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
 {
    _make_response(req, async_results::CIM_NAK);
index 4019e741b5d45628b2b662896b9e059b0804dd12..6a848d813892b0d615f3879ca12a55dd3384c003 100644 (file)
@@ -90,7 +90,7 @@ class PEGASUS_COMMON_LINKAGE MessageQueueService : public MessageQueue
       AtomicInt _die;
    protected:
 
-      virtual void _handle_incoming_operation(AsyncOpNode *operation);
+      virtual void _handle_incoming_operation(AsyncOpNode *operation, Thread *thread, MessageQueue *queue);
       virtual void _handle_async_request(AsyncRequest *req);
       virtual void _make_response(AsyncRequest *req, Uint32 code);
       cimom *_meta_dispatcher;
index 4f574a5f36564aaa934204e96b12a1686a9fe760..9329074bec9e251249dc9817e550acefb8b79fcd 100644 (file)
@@ -443,5 +443,5 @@ PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL server_func(void *parm)
    
    my_handle->exit_self( (PEGASUS_THREAD_RETURN) 1 );
    return(0);
-   
+
 }