BUG#: 7626
[tpot/pegasus/.git] / src / Pegasus / HandlerService / DestinationQueue.cpp
diff --git a/src/Pegasus/HandlerService/DestinationQueue.cpp b/src/Pegasus/HandlerService/DestinationQueue.cpp
new file mode 100644 (file)
index 0000000..489c465
--- /dev/null
@@ -0,0 +1,590 @@
+//%LICENSE////////////////////////////////////////////////////////////////
+//
+// Licensed to The Open Group (TOG) under one or more contributor license
+// agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
+// this work for additional information regarding copyright ownership.
+// Each contributor licenses this file to you under the OpenPegasus Open
+// Source License; you may not use this file except in compliance with the
+// License.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the "Software"),
+// to deal in the Software without restriction, including without limitation
+// the rights to use, copy, modify, merge, publish, distribute, sublicense,
+// and/or sell copies of the Software, and to permit persons to whom the
+// Software is furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+// TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+// SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+//
+//////////////////////////////////////////////////////////////////////////
+//
+//%/////////////////////////////////////////////////////////////////////////////
+
+#include <Pegasus/Common/Time.h>
+#include <Pegasus/Common/Tracer.h>
+#include <Pegasus/Common/Constants.h>
+#include <Pegasus/Common/StringConversion.h>
+#include <Pegasus/Config/ConfigManager.h>
+#include <Pegasus/Provider/CIMOMHandle.h>
+#include "DestinationQueue.h"
+
+PEGASUS_NAMESPACE_BEGIN
+
+// Initialize with default values.
+Uint16 DestinationQueue::_maxDeliveryRetryAttempts = 3;
+Uint64 DestinationQueue::_minDeliveryRetryIntervalUsec = 20 * 1000000;
+Uint64 DestinationQueue::_sequenceIdentifierLifetimeUsec = 600 * 1000000;
+String DestinationQueue::_indicationServiceName = "PG:IndicationService";
+String DestinationQueue::_objectManagerName = "Pegasus";
+
+const char* DestinationQueue::_indDiscardedReasons[] = {
+    "Listener not active",
+    "Subscription not active",
+    "DestinationQueue full",
+    "SequenceIdentifierLifetTime expired",
+    "DeliveryRetryAttempts exceeded",
+    "CIMServer shutdown"
+};
+
+Uint64 DestinationQueue::_serverStartupTimeUsec
+    = System::getCurrentTimeUsec();
+
+Boolean DestinationQueue::_initialized = false;
+static Mutex _intializeMutex;
+
+CIMInstance DestinationQueue::_getInstance(const CIMName &className)
+{
+    CIMOMHandle cimomHandle;
+    Array<CIMInstance> instances =
+        cimomHandle.enumerateInstances(
+            OperationContext(),
+            PEGASUS_NAMESPACENAME_INTEROP,
+            className,
+            true,
+            false,
+            false,
+            false,
+            CIMPropertyList());
+
+    return instances[0];
+}
+
+void DestinationQueue::_initIndicationServiceProperties()
+{
+    PEG_METHOD_ENTER(TRC_IND_HANDLER,
+        "DestinationQueue::_initIndicationServiceProperties");
+
+    CIMInstance instance =
+        _getInstance(PEGASUS_CLASSNAME_CIM_INDICATIONSERVICE);
+
+    instance.getProperty(
+        instance.findProperty(
+            _PROPERTY_DELIVERY_RETRYATTEMPTS)).getValue().get(
+                _maxDeliveryRetryAttempts);
+
+    instance.getProperty(
+        instance.findProperty(
+            PEGASUS_PROPERTYNAME_NAME)).getValue().get(
+               _indicationServiceName);
+
+    CIMValue value = instance.getProperty(
+        instance.findProperty(
+            _PROPERTY_DELIVERY_RETRYINTERVAL)).getValue();
+
+    if (value.getType() == CIMTYPE_UINT32)
+    {
+        Uint32 tval;
+        value.get(tval);
+        _minDeliveryRetryIntervalUsec = Uint64(tval) * 1000000;
+    }
+    else
+    {
+        value.get(_minDeliveryRetryIntervalUsec);
+        _minDeliveryRetryIntervalUsec*= 1000000;
+    }
+       // See DSP 1054 ver 1.1.0 Sec 7.10
+    _sequenceIdentifierLifetimeUsec = _maxDeliveryRetryAttempts *
+        _minDeliveryRetryIntervalUsec * 10;
+    PEG_METHOD_EXIT();
+}
+
+void DestinationQueue::_initObjectManagerProperties()
+{
+    PEG_METHOD_ENTER(TRC_IND_HANDLER,
+        "DestinationQueue::_initObjectManagerProperties");
+
+    CIMInstance instance =
+       _getInstance(PEGASUS_CLASSNAME_PG_OBJECTMANAGER);
+
+    instance.getProperty(
+        instance.findProperty(
+            PEGASUS_PROPERTYNAME_NAME)).getValue().get(
+                _objectManagerName);
+    PEG_METHOD_EXIT();
+}
+
+DestinationQueue::DestinationQueue(
+    const CIMInstance &handler):_handler(handler)
+{
+    PEG_METHOD_ENTER(TRC_IND_HANDLER,
+        "DestinationQueue::DestinationQueue");
+
+    if (!_initialized)
+    {
+        AutoMutex mtx(_intializeMutex);
+        if (!_initialized)
+        {
+            try
+            {
+                PEG_TRACE_CSTRING(TRC_IND_HANDLER, Tracer::LEVEL4,
+                    "Initializaing the Destination Queue");
+                _initIndicationServiceProperties();
+                _initObjectManagerProperties();
+            }
+            catch(const Exception &e)
+            {
+                PEG_TRACE((TRC_IND_HANDLER,Tracer::LEVEL1,
+                    "Exception %s caught while initializing the "
+                        "DestinationQueue, using default values.",
+                    (const char*)e.getMessage().getCString()));
+            }
+            catch(...)
+            {
+                PEG_TRACE_CSTRING(TRC_IND_HANDLER,Tracer::LEVEL1,
+                    "Unknown exception caught while initializing the "
+                        "DestinationQueue, using default values.");
+            }
+            _initialized = true;
+        }
+    }
+
+    // Build the sequence context
+    _sequenceContext = _indicationServiceName;
+    _sequenceContext.append("-");
+    _sequenceContext.append(_objectManagerName);
+    _sequenceContext.append("-");
+
+    Uint32 len = 0;
+    char buffer[22];
+    const char* ptr = Uint64ToString(buffer, _serverStartupTimeUsec,len);
+    _sequenceContext.append(String(ptr, len));
+    _sequenceContext.append("-");
+
+    Uint32 idx = handler.findProperty(
+        PEGASUS_PROPERTYNAME_LSTNRDST_CREATIONTIME);
+
+    if (idx != PEG_NOT_FOUND)
+    {
+        Uint64 tvalue;
+        handler.getProperty(idx).getValue().get(tvalue);
+        Uint32 llen = 0;
+        char lbuffer[22];
+        const char* lptr = Uint64ToString(lbuffer, tvalue, llen);
+        _sequenceContext.append(String(lptr, llen));
+    }
+    else
+    {
+        _sequenceContext.append(String(ptr, len));
+    }
+    _lastDeliveryRetryStatus = FAIL;
+
+    _sequenceNumber = 0;
+    _queueFullDroppedIndications = 0;
+    _lifetimeExpiredIndications = 0;
+    _retryAttemptsExceededIndications = 0;
+    _subscriptionDeleteDroppedIndications = 0;
+    _calcMaxQueueSize = true;
+    _lastSuccessfulDeliveryTimeUsec = 0;
+    _maxIndicationDeliveryQueueSize = 2400;
+
+    _queueCreationTimeUsec = System::getCurrentTimeUsec();
+    PEG_METHOD_EXIT();
+}
+
+DestinationQueue::~DestinationQueue()
+{
+    PEG_METHOD_ENTER(TRC_IND_HANDLER,
+        "DestinationQueue::~DestinationQueue");
+
+    if (!isIdle())
+    {
+        _cleanup(LISTENER_NOT_ACTIVE);
+    }
+
+    PEG_METHOD_EXIT();
+}
+
+Boolean DestinationQueue::isIdle()
+{
+    AutoMutex mtx(_queueMutex);
+    return _queue.size() == 0 && _lastDeliveryRetryStatus != PENDING;
+}
+
+Sint64 DestinationQueue::getSequenceNumber()
+{
+    AutoMutex mtx(_queueMutex);
+
+    // Determine max queue size, See PEP 324 for the algorithm
+    if (_calcMaxQueueSize)
+    {
+        if ((System::getCurrentTimeUsec() - _queueCreationTimeUsec)
+            >= _sequenceIdentifierLifetimeUsec)
+        {
+            // (10 * DeliveryRetryInterval * DeliveryRetryAttempts) /
+            // (Number of indications arrived over
+            //   sequence-identifier-lifetime.)
+
+            _maxIndicationDeliveryQueueSize = _sequenceNumber;
+
+            if (_maxIndicationDeliveryQueueSize < 200)
+            {
+                _maxIndicationDeliveryQueueSize = 200;
+            }
+            else if (_maxIndicationDeliveryQueueSize > 2400)
+            {
+                _maxIndicationDeliveryQueueSize = 2400;
+            }
+            _calcMaxQueueSize = false;
+        }
+    }
+
+    Sint64 nextSequenceNumber = _sequenceNumber++;
+
+    if (_sequenceNumber < 0)
+    {
+        _sequenceNumber = 0;
+    }
+
+    return nextSequenceNumber;
+}
+
+void DestinationQueue::updateLastSuccessfulDeliveryTime()
+{
+    AutoMutex mtx(_queueMutex);
+    _lastSuccessfulDeliveryTimeUsec = System::getCurrentTimeUsec();
+}
+
+String DestinationQueue::_getSequenceContext(
+    const CIMInstance &indication)
+{
+    String sequenceContext;
+
+    indication.getProperty(
+        indication.findProperty(
+            _PROPERTY_SEQUENCECONTEXT)).getValue().get(sequenceContext);
+
+    return sequenceContext;
+}
+
+Sint64 DestinationQueue::_getSequenceNumber(
+    const CIMInstance &indication)
+{
+    Sint64 sequenceNumber;
+
+    indication.getProperty(
+        indication.findProperty(
+            _PROPERTY_SEQUENCENUMBER)).getValue().get(sequenceNumber);
+
+    return sequenceNumber;
+}
+
+void DestinationQueue::_traceDiscardedIndication(
+    Uint32 reasonCode, const CIMInstance &indication)
+{
+
+    PEGASUS_ASSERT(reasonCode <
+        sizeof(_indDiscardedReasons)/sizeof(const char*));
+
+    PEG_TRACE((TRC_IND_HANDLER, Tracer::LEVEL3,
+        "%s, Indication with SequenceContext %s and SequenceNumber %"
+            PEGASUS_64BIT_CONVERSION_WIDTH "d is discarded",
+        _indDiscardedReasons[reasonCode],
+        (const char*)_getSequenceContext(indication).getCString(),
+        _getSequenceNumber(indication)));
+}
+
+void DestinationQueue::enqueue(IndicationInfo *info)
+{
+    PEG_METHOD_ENTER(TRC_IND_HANDLER,
+        "DestinationQueue::enqueue");
+
+    AutoMutex mtx(_queueMutex);
+    _queue.insert_back(info);
+
+    info->lastDeliveryRetryTimeUsec = System::getCurrentTimeUsec();
+    info->arrivalTimeUsec = info->lastDeliveryRetryTimeUsec;
+
+    if (_queue.size() > _maxIndicationDeliveryQueueSize)
+    {
+        _queueFullDroppedIndications++;
+        IndicationInfo *temp = _queue.remove_front();
+        _traceDiscardedIndication(
+            DESTINATIONQUEUE_FULL,
+            temp->indication);
+        delete temp;
+    }
+    PEG_METHOD_EXIT();
+}
+
+void DestinationQueue::updateDeliveryRetrySuccess(IndicationInfo *info)
+{
+    PEG_METHOD_ENTER(TRC_IND_HANDLER,
+        "DestinationQueue::updateDeliveryRetrySuccess");
+
+    AutoMutex mtx(_queueMutex);
+    PEGASUS_ASSERT(_lastDeliveryRetryStatus == PENDING);
+    _lastSuccessfulDeliveryTimeUsec = System::getCurrentTimeUsec();
+    _lastDeliveryRetryStatus = SUCCESS;
+
+    PEG_TRACE((TRC_IND_HANDLER, Tracer::LEVEL4,
+        "Indication with SequenceContext %s and SequenceNumber %"
+            PEGASUS_64BIT_CONVERSION_WIDTH "d is successfully delivered",
+        (const char*)_getSequenceContext(info->indication).getCString(),
+        _getSequenceNumber(info->indication)));
+
+    delete info;
+
+    PEG_METHOD_EXIT();
+}
+
+void DestinationQueue::updateDeliveryRetryFailure(
+    IndicationInfo *info,
+    const CIMException &e)
+{
+    PEG_METHOD_ENTER(TRC_IND_HANDLER,
+        "DestinationQueue::updateDeliveryRetryFailure");
+
+    AutoMutex mtx(_queueMutex);
+
+    PEGASUS_ASSERT(_lastDeliveryRetryStatus == PENDING);
+    _lastDeliveryRetryStatus = FAIL;
+    info->deliveryRetryAttemptsMade++;
+
+    if (info->deliveryRetryAttemptsMade >= _maxDeliveryRetryAttempts)
+    {
+        _retryAttemptsExceededIndications++;
+        _traceDiscardedIndication(
+            DRA_EXCEEDED,
+            info->indication);
+        delete info;
+        PEG_METHOD_EXIT();
+        return;
+    }
+
+    if (_queue.size() == _maxIndicationDeliveryQueueSize)
+    {
+        _queueFullDroppedIndications++;
+        _traceDiscardedIndication(
+            DESTINATIONQUEUE_FULL,
+            info->indication);
+        PEG_METHOD_EXIT();
+        delete info;
+    }
+    else
+    {
+        info->lastDeliveryRetryTimeUsec = System::getCurrentTimeUsec();
+        _queue.insert_back(info);
+    }
+
+    PEG_TRACE((TRC_IND_HANDLER, Tracer::LEVEL4,
+        "Delivery failure for indication with SequenceContext %s and "
+            "SequenceNumber %" PEGASUS_64BIT_CONVERSION_WIDTH "d."
+            " DeliveryRetryAttempts made %u. Exception : %s",
+        (const char*)_getSequenceContext(info->indication).getCString(),
+        _getSequenceNumber(info->indication),
+        info->deliveryRetryAttemptsMade,
+        (const char*)e.getMessage().getCString()));
+
+    PEG_METHOD_EXIT();
+}
+
+void DestinationQueue::_waitForNonPendingDeliveryStatus()
+{
+    PEG_METHOD_ENTER(TRC_IND_HANDLER,
+        "DestinationQueue::_waitForNonPendingDeliveryStatus");
+
+    while (true)
+    {
+        {
+            AutoMutex mtx(_queueMutex);
+            if (_lastDeliveryRetryStatus != PENDING)
+            {
+                break;
+            }
+        }
+        Threads::yield();
+        Threads::sleep(50);
+    }
+    PEG_METHOD_EXIT();
+}
+
+void DestinationQueue::deleteMatchedIndications(
+    const CIMObjectPath &subscriptionPath)
+{
+    PEG_METHOD_ENTER(TRC_IND_HANDLER,
+        "DestinationQueue::deleteMatchedIndications");
+
+    _waitForNonPendingDeliveryStatus();
+
+    IndicationInfo *info;
+    AutoMutex mtx(_queueMutex);
+
+    for(Uint32 i = 0, n = _queue.size(); i < n; ++i)
+    {
+        info = _queue.remove_front();
+        if (info->subscription.getPath().identical(subscriptionPath))
+        {
+            _subscriptionDeleteDroppedIndications++;
+            _traceDiscardedIndication(
+                SUBSCRIPTION_NOT_ACTIVE,
+                info->indication);
+            delete info;
+        }
+        else
+        {
+            _queue.insert_back(info);
+        }
+    }
+    PEG_METHOD_EXIT();
+}
+
+void DestinationQueue::cleanup()
+{
+    PEG_METHOD_ENTER(TRC_IND_HANDLER,
+        "DestinationQueue::cleanup");
+
+    _cleanup(LISTENER_NOT_ACTIVE);
+
+    PEG_METHOD_EXIT();
+}
+
+void DestinationQueue::shutdown()
+{
+    PEG_METHOD_ENTER(TRC_IND_HANDLER,
+        "DestinationQueue::shutdown");
+
+    _cleanup(CIMSERVER_SHUTDOWN);
+
+    PEG_METHOD_EXIT();
+}
+
+void DestinationQueue::_cleanup(int reasonCode)
+{
+    _waitForNonPendingDeliveryStatus();
+
+    IndicationInfo *info;
+    while ((info = _queue.remove_front()))
+    {
+        _traceDiscardedIndication(
+            reasonCode,
+            info->indication);
+        delete info;
+    }
+}
+
+IndicationInfo* DestinationQueue::getNextIndicationForDelivery(
+    Uint64 &timeNowUsec, Uint64 &nextIndDRIExpTimeUsec)
+{
+    AutoMutex mtx(_queueMutex);
+
+    nextIndDRIExpTimeUsec = 0;
+
+    if (!_queue.size() || _lastDeliveryRetryStatus == PENDING)
+    {
+        return 0;
+    }
+
+    IndicationInfo *info;
+
+    while (_queue.size())
+    {
+        info = _queue.front();
+
+        if (timeNowUsec < info->arrivalTimeUsec ||
+            timeNowUsec < info->lastDeliveryRetryTimeUsec)
+        {
+            timeNowUsec = System::getCurrentTimeUsec();
+        }
+
+        if ((timeNowUsec - info->arrivalTimeUsec) >=
+            _sequenceIdentifierLifetimeUsec)
+        {
+            _lifetimeExpiredIndications++;
+            IndicationInfo *temp = _queue.remove_front();
+            _traceDiscardedIndication(
+                SIL_EXPIRED,
+                temp->indication);
+            delete temp;
+        }
+        else if ((timeNowUsec - info->lastDeliveryRetryTimeUsec)
+            >= _minDeliveryRetryIntervalUsec)
+        {
+            _lastDeliveryRetryStatus = PENDING;
+            _queue.remove_front();
+            IndicationInfo *temp = _queue.front();
+
+            if (temp)
+            {
+                if (timeNowUsec - temp->lastDeliveryRetryTimeUsec
+                        < _minDeliveryRetryIntervalUsec)
+                {
+                    nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec -
+                        (timeNowUsec - temp->lastDeliveryRetryTimeUsec);
+                }
+
+                PEGASUS_ASSERT(nextIndDRIExpTimeUsec
+                    <= _minDeliveryRetryIntervalUsec);
+            }
+
+            return info;
+        }
+        else
+        {
+            if (timeNowUsec - info->lastDeliveryRetryTimeUsec
+                    < _minDeliveryRetryIntervalUsec)
+            {
+                nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec -
+                    (timeNowUsec - info->lastDeliveryRetryTimeUsec);
+            }
+
+            PEGASUS_ASSERT(nextIndDRIExpTimeUsec
+                <= _minDeliveryRetryIntervalUsec);
+
+            break;
+        }
+    }
+
+    return 0;
+}
+
+void DestinationQueue::getInfo(QueueInfo &qinfo)
+{
+    AutoMutex mtx(_queueMutex);
+
+    qinfo.handlerName = _handler.getPath();
+    qinfo.queueCreationTimeUsec = _queueCreationTimeUsec;
+    qinfo.sequenceContext = _sequenceContext;
+    qinfo.nextSequenceNumber = _sequenceNumber;
+    qinfo.maxQueueLength = _maxIndicationDeliveryQueueSize;
+    qinfo.sequenceIdentifierLifetimeSeconds =
+        _sequenceIdentifierLifetimeUsec / 1000000;
+    qinfo.size = _queue.size();
+    qinfo.queueFullDroppedIndications = _queueFullDroppedIndications;
+    qinfo.lifetimeExpiredIndications = _lifetimeExpiredIndications;
+    qinfo.retryAttemptsExceededIndications = _retryAttemptsExceededIndications;
+    qinfo.subscriptionDisableDroppedIndications =
+        _subscriptionDeleteDroppedIndications;
+    qinfo.lastSuccessfulDeliveryTimeUsec = _lastSuccessfulDeliveryTimeUsec;
+}
+
+PEGASUS_NAMESPACE_END
+