f069e6cf42e2d70718e1e7a841f3fac5abb7f3cc
[tpot/pegasus/.git] / src / Pegasus / Common / Thread.cpp
1 //%/////////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
4 // The Open Group, Tivoli Systems
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to
8 // deal in the Software without restriction, including without limitation the
9 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 // sell copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 // 
13 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
14 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
15 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
16 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
17 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
19 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 //
22 //==============================================================================
23 //
24 // Author: Mike Day (mdday@us.ibm.com)
25 //
26 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
27 //              added nsk platform support
28 //
29 //%/////////////////////////////////////////////////////////////////////////////
30
31 #include "Thread.h"
32 #include <Pegasus/Common/IPC.h>
33 #include <Pegasus/Common/Tracer.h>
34
35 #if defined(PEGASUS_OS_TYPE_WINDOWS)
36 # include "ThreadWindows.cpp"
37 #elif defined(PEGASUS_OS_TYPE_UNIX)
38 # include "ThreadUnix.cpp"
39 #elif defined(PEGASUS_OS_TYPE_NSK)
40 # include "ThreadNsk.cpp"
41 #else
42 # error "Unsupported platform"
43 #endif
44
45 PEGASUS_NAMESPACE_BEGIN
46
47
48 void thread_data::default_delete(void * data)
49 {
50    if( data != NULL)
51       ::operator delete(data);
52 }
53
54 // l10n start
55 void language_delete(void * data)
56 {
57    if( data != NULL)
58    {
59       AcceptLanguages * al = static_cast<AcceptLanguages *>(data);
60       delete al;
61    }
62 }
63 // l10n end
64
65 Boolean Thread::_signals_blocked = false;
66 // l10n
67 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = -1;
68 Boolean Thread::_key_initialized = false;
69 Boolean Thread::_key_error = false;
70
71
72 // for non-native implementations
73 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
74 void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
75 {
76     cleanup_handler *cu = new cleanup_handler(routine, parm);
77     try
78     {
79         _cleanup.insert_first(cu);
80     }
81     catch(IPCException&)
82     {
83         delete cu;
84         throw;
85     }
86     return;
87 }
88         
89 void Thread::cleanup_pop(Boolean execute) throw(IPCException)
90 {
91     cleanup_handler *cu ;
92     try
93     {
94         cu = _cleanup.remove_first() ;
95     }
96     catch(IPCException&)
97     {
98         PEGASUS_ASSERT(0);
99      }
100     if(execute == true)
101         cu->execute();
102     delete cu;
103 }
104                 
105 #endif
106
107
108 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
109
110
111 #ifndef PEGASUS_THREAD_EXIT_NATIVE
112 void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
113 {
114     // execute the cleanup stack and then return
115    while( _cleanup.count() )
116    {
117        try
118        {
119            cleanup_pop(true);
120        }
121        catch(IPCException&)
122        {
123           PEGASUS_ASSERT(0);
124           break;
125        }
126    }
127    _exit_code = exit_code;
128    exit_thread(exit_code);
129    _handle.thid = 0;
130 }
131
132
133 #endif
134
135 // l10n start
136 Sint8 Thread::initializeKey()
137 {
138    PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
139    if (!Thread::_key_initialized)
140    {
141         if (Thread::_key_error)
142         {
143                 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
144                           "Thread: ERROR - thread key error"); 
145                 return -1;
146         }
147
148         if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
149         {
150                 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
151                           "Thread: able to create a thread key");   
152                 Thread::_key_initialized = true;        
153         }
154         else
155         {
156                 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
157                           "Thread: ERROR - unable to create a thread key"); 
158                 Thread::_key_error = true;
159                 return -1;
160         }
161    }
162
163    PEG_METHOD_EXIT();
164    return 0;  
165 }
166
167 Thread * Thread::getCurrent()
168 {
169     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent"); 
170     if (Thread::initializeKey() != 0)
171     {
172         return NULL;  
173     }
174     PEG_METHOD_EXIT();  
175     return (Thread *)pegasus_get_thread_specific(_platform_thread_key); 
176 }
177
178 void Thread::setCurrent(Thread * thrd)
179 {
180    PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
181    if (Thread::initializeKey() == 0)
182    {
183         if (pegasus_set_thread_specific(Thread::_platform_thread_key,
184                                                                  (void *) thrd) == 0)
185         {
186                 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
187                           "Successful set Thread * into thread specific storage");   
188         }
189         else
190         {
191                 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
192                           "ERROR: got error setting Thread * into thread specific storage");   
193         }
194    }
195    PEG_METHOD_EXIT();  
196 }
197
198 AcceptLanguages * Thread::getLanguages()
199 {
200     PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");               
201     
202         Thread * curThrd = Thread::getCurrent();
203         if (curThrd == NULL)
204                 return NULL;
205         AcceptLanguages * acceptLangs =
206                  (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
207         curThrd->dereference_tsd();
208     PEG_METHOD_EXIT();  
209         return acceptLangs;
210 }
211
212 void Thread::setLanguages(AcceptLanguages *langs) //l10n
213 {
214    PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
215                 
216    Thread * currentThrd = Thread::getCurrent();
217    if (currentThrd != NULL)
218    {
219                 // deletes the old tsd and creates a new one
220                 currentThrd->put_tsd("acceptLanguages",
221                         language_delete, 
222                         sizeof(AcceptLanguages *),
223                         langs);                 
224    }
225    
226    PEG_METHOD_EXIT();                   
227 }
228
229 void Thread::clearLanguages() //l10n
230 {
231    PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
232         
233    Thread * currentThrd = Thread::getCurrent();
234    if (currentThrd != NULL)
235    {
236                 // deletes the old tsd
237                 currentThrd->delete_tsd("acceptLanguages");             
238    }
239    
240    PEG_METHOD_EXIT();                   
241 }
242 // l10n end      
243
244
245
246 // two special synchronization classes for ThreadPool
247 // 
248
249 class timed_mutex 
250 {
251    public:
252       timed_mutex(Mutex* mut, int msec)
253          :_mut(mut)
254       {
255          _mut->timed_lock(msec, pegasus_thread_self());
256       }
257       ~timed_mutex(void)
258       {
259          _mut->unlock();
260       }
261       Mutex* _mut;
262 };
263
264
265 class try_mutex
266 {
267    public:
268       try_mutex(Mutex* mut)
269          :_mut(mut)
270       {
271          _mut->try_lock(pegasus_thread_self());
272       }
273       ~try_mutex(void)
274       {
275          _mut->unlock();
276       }
277       
278       Mutex* _mut;
279 };
280
281
282 DQueue<ThreadPool> ThreadPool::_pools(true);
283
284
285 void ThreadPool::kill_idle_threads(void)
286 {
287    static struct timeval now, last = {0, 0};
288    
289    pegasus_gettimeofday(&now);
290    if(now.tv_sec - last.tv_sec > 5)
291    {
292       _pools.lock();
293       ThreadPool *p = _pools.next(0);
294       while(p != 0)
295       {
296          try 
297          {
298             p->kill_dead_threads();
299          }
300          catch(...)
301          {
302          }
303          p = _pools.next(p);
304       }
305       _pools.unlock();
306       pegasus_gettimeofday(&last);
307    }
308 }
309
310
311 ThreadPool::ThreadPool(Sint16 initial_size,
312                        const Sint8 *key,
313                        Sint16 min,
314                        Sint16 max,
315                        struct timeval & alloc_wait,
316                        struct timeval & dealloc_wait,
317                        struct timeval & deadlock_detect)
318    : _max_threads(max), _min_threads(min),
319      _current_threads(0),
320      _pool(true), _running(true),
321      _dead(true), _dying(0)
322 {
323    _allocate_wait.tv_sec = alloc_wait.tv_sec;
324    _allocate_wait.tv_usec = alloc_wait.tv_usec;
325    _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
326    _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
327    _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
328    _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
329    memset(_key, 0x00, 17);
330    if(key != 0)
331       strncpy(_key, key, 16);
332    if(_max_threads > 0 && _max_threads < initial_size)
333       _max_threads = initial_size;
334    if(_min_threads > initial_size)
335       _min_threads = initial_size;
336
337    int i;
338    for(i = 0; i < initial_size; i++)
339    {
340       _link_pool(_init_thread());
341    }
342    _pools.insert_last(this);
343 }
344
345
346 // Note:   <<< Fri Oct 17 09:19:03 2003 mdd >>>
347 // the pegasus_yield() calls that preceed each th->join() are to 
348 // give a thread on the running list a chance to reach a cancellation
349 // point before the join 
350
351 ThreadPool::~ThreadPool(void)
352 {
353    try 
354    {      
355       // set the dying flag so all thread know the destructor has been entered
356       {
357          auto_mutex(&(this->_monitor));
358          _dying++;
359       }
360       // remove from the global pools list 
361       _pools.remove(this);
362
363       // start with idle threads. 
364       Thread *th = 0;
365       th = _pool.remove_first();
366       Semaphore* sleep_sem;
367       
368       while(th != 0)
369       {
370          sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
371          if(sleep_sem == 0)
372          {
373             th->dereference_tsd();
374             throw NullPointer();
375          }
376          
377          // Signal to get the thread out of the work loop.
378          sleep_sem->signal();
379
380          // Signal to get the thread past the end. See the comment
381          // "wait to be awakend by the thread pool destructor"
382          // Note: the current implementation of Thread for Windows
383          // does not implement "pthread" cancelation points so this
384          // is needed.
385          sleep_sem->signal();
386          th->dereference_tsd();
387          th->cancel();
388          th->join();
389          delete th;
390          th = _pool.remove_first();
391       }
392       th = _dead.remove_first();
393       while(th != 0)
394       {
395          sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
396
397          if(sleep_sem == 0)
398          {
399             th->dereference_tsd();
400             throw NullPointer();
401          }
402          
403          // signal the thread's sleep semaphore
404          sleep_sem->signal();
405          sleep_sem->signal();
406          th->dereference_tsd();  
407          th->cancel();
408          th->join();
409          delete th;
410          th = _dead.remove_first();
411       }
412
413       {
414          th = _running.remove_first();
415          while(th != 0)
416          {       
417             // signal the thread's sleep semaphore
418
419             sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
420             if(sleep_sem == 0 )
421             {
422                th->dereference_tsd();
423                throw NullPointer();
424             }
425             
426             sleep_sem->signal();
427             sleep_sem->signal();
428             th->dereference_tsd();
429             th->cancel();
430             pegasus_yield();
431             
432             th->join();
433             delete th;
434             th = _running.remove_first();
435          }
436       }
437       
438    }
439    
440    catch(...)
441    {
442    }
443 }
444
445 // make this static to the class
446 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
447 {
448    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
449
450    Thread *myself = (Thread *)parm;
451    if(myself == 0)
452    {
453       PEG_METHOD_EXIT();
454       throw NullPointer();
455    }
456    
457 // l10n
458    // Set myself into thread specific storage
459    // This will allow code to get its own Thread
460    Thread::setCurrent(myself);  
461
462    ThreadPool *pool = (ThreadPool *)myself->get_parm();
463    if(pool == 0 ) 
464    {
465       PEG_METHOD_EXIT();
466       throw NullPointer();
467    }
468    if(pool->_dying.value())
469    {
470       PEG_METHOD_EXIT();
471       return((PEGASUS_THREAD_RETURN)0);
472    }
473    
474    Semaphore *sleep_sem = 0;
475    Semaphore *blocking_sem = 0;
476    
477    struct timeval *deadlock_timer = 0;
478    
479    try
480    {
481       sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
482       myself->dereference_tsd();
483       deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
484       myself->dereference_tsd(); 
485    }
486
487    catch(...)
488    {
489       PEG_METHOD_EXIT();
490       return((PEGASUS_THREAD_RETURN)0);
491    }
492    
493    if(sleep_sem == 0 || deadlock_timer == 0)
494    {
495       PEG_METHOD_EXIT();
496       return((PEGASUS_THREAD_RETURN)0);
497    }
498
499    while(1)
500    {
501       try 
502       {
503          if(pool->_dying.value())
504             break;
505       }
506       catch(...)
507       {
508          return((PEGASUS_THREAD_RETURN)0);
509       }
510       
511       try 
512       {
513          sleep_sem->wait();
514       }
515       catch(IPCException& )
516       {
517          PEG_METHOD_EXIT();
518          return((PEGASUS_THREAD_RETURN)0);
519       }
520       
521       // when we awaken we reside on the running queue, not the pool queue
522       if(pool->_dying.value())
523       {
524          PEG_METHOD_EXIT();
525          return((PEGASUS_THREAD_RETURN)0);
526       }
527       
528       
529       PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
530       void *parm = 0;
531
532       try
533       {
534          _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
535             myself->reference_tsd("work func");
536          myself->dereference_tsd();
537          parm = myself->reference_tsd("work parm");
538          myself->dereference_tsd();
539          blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
540          myself->dereference_tsd();
541
542       }
543       catch(IPCException &)
544       {
545          PEG_METHOD_EXIT();
546          return((PEGASUS_THREAD_RETURN)0);
547       }
548       
549       if(_work == 0)
550       {
551          PEG_METHOD_EXIT();
552          throw NullPointer();
553       }
554
555       if(_work ==
556          (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
557       {
558          _work(parm);
559       }
560
561       gettimeofday(deadlock_timer, NULL);
562       try 
563       {
564          {
565             timed_mutex(&(pool->_monitor), 1000);
566             if(pool->_dying.value())
567             {
568                _undertaker(parm);
569             }
570          }
571          _work(parm);
572       }
573       catch(...)
574       {
575          return((PEGASUS_THREAD_RETURN)0);
576       }
577       
578       // put myself back onto the available list
579       try
580       {
581          timed_mutex(&(pool->_monitor), 1000);
582          if(pool->_dying.value() == 0)
583          {
584             gettimeofday(deadlock_timer, NULL);
585             if( blocking_sem != 0 )
586                blocking_sem->signal();
587       
588             // If we are not on _running then ~ThreadPool has removed
589             // us and now "owns" our pointer.
590             if( pool->_running.remove((void *)myself) != 0 )
591                 pool->_pool.insert_first(myself);
592             else 
593             {       
594                return((PEGASUS_THREAD_RETURN)0);
595             }
596             
597          }
598          else
599          {
600             PEG_METHOD_EXIT();
601             return((PEGASUS_THREAD_RETURN)0);
602          }
603       }
604       catch(...)
605       {
606          PEG_METHOD_EXIT();
607          return((PEGASUS_THREAD_RETURN)0);
608       }
609       
610    }
611
612    // TODO: Why is this needed? Why not just continue?
613    // wait to be awakend by the thread pool destructor
614    //sleep_sem->wait();
615
616    myself->test_cancel();
617
618    PEG_METHOD_EXIT();
619    return((PEGASUS_THREAD_RETURN)0);
620 }
621
622 void ThreadPool::allocate_and_awaken(void *parm,
623                                      PEGASUS_THREAD_RETURN \
624                                      (PEGASUS_THREAD_CDECL *work)(void *), 
625                                      Semaphore *blocking)
626
627    throw(IPCException)
628 {
629    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
630    struct timeval start;
631    gettimeofday(&start, NULL);
632    Thread *th = 0;
633    
634    try 
635    {
636       timed_mutex(&(this->_monitor), 1000);
637       if(_dying.value())
638       {
639          return;
640       }
641       th = _pool.remove_first();
642    }
643    catch(...)
644    {
645       return;
646       
647    }
648    
649    
650    // wait for the right interval and try again
651    while (th == 0 && _dying.value() < 1)
652    {
653       // will throw an IPCException& 
654       _check_deadlock(&start) ;
655       
656       if(_max_threads == 0 || _current_threads < _max_threads)
657       {
658          th = _init_thread();
659          continue;
660       }
661       pegasus_yield();
662       try
663       {
664          timed_mutex(&(this->_monitor), 1000);
665          if(_dying.value())
666          {
667             return;
668          }
669          th = _pool.remove_first();
670       }
671       catch(...)
672       {
673          return ;
674       }
675    }
676
677    if(_dying.value() < 1)
678    {
679       // initialize the thread data with the work function and parameters
680       Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
681           "Initializing thread with work function and parameters: parm = %p",
682           parm);
683
684       th->delete_tsd("work func");
685       th->put_tsd("work func", NULL,
686                   sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
687                   (void *)work);
688       th->delete_tsd("work parm");
689       th->put_tsd("work parm", NULL, sizeof(void *), parm);
690       th->delete_tsd("blocking sem");
691       if(blocking != 0 )
692          th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
693       try 
694       {
695          timed_mutex(&(this->_monitor), 1000);
696          if(_dying.value())
697          {
698             th->cancel();
699             th->join();
700             delete th;
701             return;
702          }
703          
704          // put the thread on the running list
705
706
707          _running.insert_first(th);
708       // signal the thread's sleep semaphore to awaken it
709          Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
710          
711          if(sleep_sem == 0)
712          {
713             th->dereference_tsd();
714             PEG_METHOD_EXIT();
715             throw NullPointer();
716          }
717          Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
718          sleep_sem->signal();
719          th->dereference_tsd();
720       }
721       catch(...)
722       {
723          PEG_METHOD_EXIT();
724          return;
725       }
726       
727    }
728    else
729    {
730       th->cancel();
731       th->join();
732       delete th;
733    }
734    
735    PEG_METHOD_EXIT();
736 }
737
738 // caller is responsible for only calling this routine during slack periods
739 // but should call it at least once per _deadlock_detect with the running q
740 // and at least once per _deallocate_wait for the pool q
741
742 Uint32 ThreadPool::kill_dead_threads(void)
743          throw(IPCException)
744 {
745    struct timeval now;
746    gettimeofday(&now, NULL);
747    Uint32 bodies = 0;
748    
749    // first go thread the dead q and clean it up as much as possible
750    try 
751    {
752       timed_mutex(&(this->_monitor), 1000);
753       if(_dying.value() )
754       {
755          return 0;
756       }
757       
758       while(_dead.count() > 0 && _dying.value() == 0 )
759       {
760          Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread");
761          Thread *dead = _dead.remove_first();
762          
763          if(dead == 0)
764             throw NullPointer();
765          dead->join();
766          delete dead;
767       }
768    }
769    catch(...)
770    {
771    }
772    
773    
774    DQueue<Thread> * map[2] =
775       {
776          &_pool, &_running
777       };
778
779
780    DQueue<Thread> *q = 0;
781    int i = 0;
782    AtomicInt needed(0);
783
784 #ifdef PEGASUS_DISABLE_KILLING_HUNG_THREADS
785    // This change prevents the thread pool from killing "hung" threads.
786    // The definition of a "hung" thread is one that has been on the run queue
787    // for longer than the time interval set when the thread pool was created.
788    // Cancelling "hung" threads has proven to be problematic.
789
790    // With this change the thread pool will not cancel "hung" threads.  This
791    // may prevent a crash depending upon the state of the "hung" thread.  In
792    // the case that the thread is actually hung, this change causes the
793    // thread resources not to be reclaimed.
794
795    // Idle threads, those that have not executed a routine for a time
796    // interval, continue to be destroyed.  This is normal and should not
797    // cause any problems.
798    for( ; i < 1; i++)
799 #else
800    for( ; i < 2; i++)
801 #endif
802    {
803       try 
804       {
805          try_mutex(&(this->_monitor)); 
806       }
807       catch(IPCException&)
808       {
809          return bodies;
810       }
811       
812       q = map[i];
813       if(q->count() > 0 )
814       {
815          try
816          {
817             if(_dying.value())
818             {
819                return bodies;
820             }
821             
822             q->try_lock();
823          }
824          catch(...)
825          {
826             return bodies;
827          }
828
829          struct timeval dt = { 0, 0 };
830          struct timeval *dtp;
831          Thread *th = 0;
832          th = q->next(th);
833          while (th != 0 )
834          {
835             try
836             {
837                dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
838             }
839             catch(...)
840             {
841                q->unlock();
842                return bodies;
843             }
844         
845             if(dtp != 0)
846             {
847                memcpy(&dt, dtp, sizeof(struct timeval));
848             }
849             th->dereference_tsd();
850             struct timeval deadlock_timeout;
851             Boolean too_long;
852             if( i == 0)
853             {
854                too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));
855             }
856             else 
857             {
858                too_long = check_time(&dt, get_deadlock_detect(&deadlock_timeout));
859             }
860             
861             if( true == too_long)
862             {
863                // if we are deallocating from the pool, escape if we are
864                // down to the minimum thread count
865                _current_threads--;
866                if( _current_threads.value() < (Uint32)_min_threads )
867                {
868                   if( i == 0)
869                   {
870                      _current_threads++;
871                      th = q->next(th);
872                      continue;
873                   }
874                   else
875                   {
876                      // we are killing a hung thread and we will drop below the
877                      // minimum. create another thread to make up for the one
878                      // we are about to kill
879                      needed++;
880                   }
881                }
882         
883                th = q->remove_no_lock((void *)th);
884         
885                if(th != 0)
886                {
887                   if( i == 0 )
888                   {
889                      th->delete_tsd("work func");
890                      th->put_tsd("work func", NULL,
891                                  sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
892                                  (void *)&_undertaker);
893                      th->delete_tsd("work parm");
894                      th->put_tsd("work parm", NULL, sizeof(void *), th);
895                      
896                      // signal the thread's sleep semaphore to awaken it
897                      Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
898                      
899                      if(sleep_sem == 0)
900                      {
901                         q->unlock();
902                         th->dereference_tsd();
903                         throw NullPointer();
904                      }
905                      
906                      bodies++;
907                      th->dereference_tsd();
908                      _dead.insert_first(th);
909                      sleep_sem->signal();
910                      th = 0;
911                   }
912                   else 
913                   {
914                      // deadlocked threads
915                      Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Killing a deadlocked thread");
916                      th->cancel();
917                      delete th;
918                   }
919                }
920             }
921             th = q->next(th);
922             pegasus_yield();
923          }
924          q->unlock();
925       }
926    }
927    if(_dying.value() )
928       return bodies;
929    
930    while (needed.value() > 0)   {
931       _link_pool(_init_thread());
932       needed--;
933       pegasus_sleep(0);
934    }
935     return bodies; 
936 }
937
938
939 Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
940 {
941    // never time out if the interval is zero
942    if(interval && interval->tv_sec == 0 && interval->tv_usec == 0)
943       return false;
944    
945    struct timeval now = {0,0}, finish = {0,0}, remaining = {0,0};
946    Uint32 usec;
947    pegasus_gettimeofday(&now);
948    /* remove valgrind error */
949    pegasus_gettimeofday(&remaining);
950    
951
952    finish.tv_sec = start->tv_sec + interval->tv_sec;
953    usec = start->tv_usec + interval->tv_usec;
954    finish.tv_sec += (usec / 1000000);
955    usec %= 1000000;
956    finish.tv_usec = usec;
957     
958    if ( timeval_subtract(&remaining, &finish, &now) )
959       return true;
960    else
961       return false;
962 }
963
964 PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
965 {
966    exit_thread((PEGASUS_THREAD_RETURN)1);
967    return (PEGASUS_THREAD_RETURN)1;
968 }
969
970
971  void ThreadPool::_sleep_sem_del(void *p)
972 {
973    if(p != 0)
974    {
975       delete (Semaphore *)p;
976    }
977 }
978
979  void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
980 {
981    if (true == check_time(start, &_deadlock_detect))
982       throw Deadlock(pegasus_thread_self());
983    return;
984 }
985
986
987  Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
988 {
989    return(check_time(start, &_deadlock_detect));
990 }
991
992  Boolean ThreadPool::_check_dealloc(struct timeval *start)
993 {
994    return(check_time(start, &_deallocate_wait));
995 }
996
997  Thread *ThreadPool::_init_thread(void) throw(IPCException)
998 {
999    Thread *th = (Thread *) new Thread(_loop, this, false);
1000    // allocate a sleep semaphore and pass it in the thread context
1001    // initial count is zero, loop function will sleep until
1002    // we signal the semaphore
1003    Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
1004    th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
1005    
1006    struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
1007    pegasus_gettimeofday(dldt);
1008    
1009    th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
1010    // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
1011   
1012    th->run();
1013    _current_threads++;
1014    pegasus_yield();
1015    
1016    return th;
1017 }
1018
1019  void ThreadPool::_link_pool(Thread *th) throw(IPCException)
1020 {
1021    if(th == 0)
1022       throw NullPointer();
1023    try 
1024    {
1025       
1026       timed_mutex(&(this->_monitor), 1000);
1027       if(_dying.value())
1028       {
1029          th->cancel();
1030          th->join();
1031          delete th;
1032       }
1033       
1034       _pool.insert_first(th);
1035       
1036    }
1037    catch(...)
1038    {
1039    }
1040 }
1041
1042
1043 PEGASUS_NAMESPACE_END
1044