1 //%/////////////////////////////////////////////////////////////////////////////
3 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
4 // The Open Group, Tivoli Systems
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to
8 // deal in the Software without restriction, including without limitation the
9 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 // sell copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
14 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
15 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
16 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
17 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
19 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22 //==============================================================================
24 // Author: Mike Day (mdday@us.ibm.com)
26 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
27 // added nsk platform support
29 //%/////////////////////////////////////////////////////////////////////////////
32 #include <Pegasus/Common/IPC.h>
33 #include <Pegasus/Common/Tracer.h>
35 #if defined(PEGASUS_OS_TYPE_WINDOWS)
36 # include "ThreadWindows.cpp"
37 #elif defined(PEGASUS_OS_TYPE_UNIX)
38 # include "ThreadUnix.cpp"
39 #elif defined(PEGASUS_OS_TYPE_NSK)
40 # include "ThreadNsk.cpp"
42 # error "Unsupported platform"
45 PEGASUS_NAMESPACE_BEGIN
48 void thread_data::default_delete(void * data)
51 ::operator delete(data);
55 void language_delete(void * data)
59 AcceptLanguages * al = static_cast<AcceptLanguages *>(data);
65 Boolean Thread::_signals_blocked = false;
67 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = -1;
68 Boolean Thread::_key_initialized = false;
69 Boolean Thread::_key_error = false;
72 // for non-native implementations
73 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
74 void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
76 cleanup_handler *cu = new cleanup_handler(routine, parm);
79 _cleanup.insert_first(cu);
89 void Thread::cleanup_pop(Boolean execute) throw(IPCException)
94 cu = _cleanup.remove_first() ;
108 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
111 #ifndef PEGASUS_THREAD_EXIT_NATIVE
112 void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
114 // execute the cleanup stack and then return
115 while( _cleanup.count() )
127 _exit_code = exit_code;
128 exit_thread(exit_code);
136 Sint8 Thread::initializeKey()
138 PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
139 if (!Thread::_key_initialized)
141 if (Thread::_key_error)
143 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
144 "Thread: ERROR - thread key error");
148 if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
150 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
151 "Thread: able to create a thread key");
152 Thread::_key_initialized = true;
156 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
157 "Thread: ERROR - unable to create a thread key");
158 Thread::_key_error = true;
167 Thread * Thread::getCurrent()
169 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
170 if (Thread::initializeKey() != 0)
175 return (Thread *)pegasus_get_thread_specific(_platform_thread_key);
178 void Thread::setCurrent(Thread * thrd)
180 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
181 if (Thread::initializeKey() == 0)
183 if (pegasus_set_thread_specific(Thread::_platform_thread_key,
186 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
187 "Successful set Thread * into thread specific storage");
191 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
192 "ERROR: got error setting Thread * into thread specific storage");
198 AcceptLanguages * Thread::getLanguages()
200 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
202 Thread * curThrd = Thread::getCurrent();
205 AcceptLanguages * acceptLangs =
206 (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
207 curThrd->dereference_tsd();
212 void Thread::setLanguages(AcceptLanguages *langs) //l10n
214 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
216 Thread * currentThrd = Thread::getCurrent();
217 if (currentThrd != NULL)
219 // deletes the old tsd and creates a new one
220 currentThrd->put_tsd("acceptLanguages",
222 sizeof(AcceptLanguages *),
229 void Thread::clearLanguages() //l10n
231 PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
233 Thread * currentThrd = Thread::getCurrent();
234 if (currentThrd != NULL)
236 // deletes the old tsd
237 currentThrd->delete_tsd("acceptLanguages");
246 // two special synchronization classes for ThreadPool
252 timed_mutex(Mutex* mut, int msec)
255 _mut->timed_lock(msec, pegasus_thread_self());
268 try_mutex(Mutex* mut)
271 _mut->try_lock(pegasus_thread_self());
282 DQueue<ThreadPool> ThreadPool::_pools(true);
285 void ThreadPool::kill_idle_threads(void)
287 static struct timeval now, last = {0, 0};
289 pegasus_gettimeofday(&now);
290 if(now.tv_sec - last.tv_sec > 5)
293 ThreadPool *p = _pools.next(0);
298 p->kill_dead_threads();
306 pegasus_gettimeofday(&last);
311 ThreadPool::ThreadPool(Sint16 initial_size,
315 struct timeval & alloc_wait,
316 struct timeval & dealloc_wait,
317 struct timeval & deadlock_detect)
318 : _max_threads(max), _min_threads(min),
320 _pool(true), _running(true),
321 _dead(true), _dying(0)
323 _allocate_wait.tv_sec = alloc_wait.tv_sec;
324 _allocate_wait.tv_usec = alloc_wait.tv_usec;
325 _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
326 _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
327 _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
328 _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
329 memset(_key, 0x00, 17);
331 strncpy(_key, key, 16);
332 if(_max_threads > 0 && _max_threads < initial_size)
333 _max_threads = initial_size;
334 if(_min_threads > initial_size)
335 _min_threads = initial_size;
338 for(i = 0; i < initial_size; i++)
340 _link_pool(_init_thread());
342 _pools.insert_last(this);
346 // Note: <<< Fri Oct 17 09:19:03 2003 mdd >>>
347 // the pegasus_yield() calls that preceed each th->join() are to
348 // give a thread on the running list a chance to reach a cancellation
349 // point before the join
351 ThreadPool::~ThreadPool(void)
355 // set the dying flag so all thread know the destructor has been entered
357 auto_mutex(&(this->_monitor));
360 // remove from the global pools list
363 // start with idle threads.
365 th = _pool.remove_first();
366 Semaphore* sleep_sem;
370 sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
373 th->dereference_tsd();
377 // Signal to get the thread out of the work loop.
380 // Signal to get the thread past the end. See the comment
381 // "wait to be awakend by the thread pool destructor"
382 // Note: the current implementation of Thread for Windows
383 // does not implement "pthread" cancelation points so this
386 th->dereference_tsd();
390 th = _pool.remove_first();
392 th = _dead.remove_first();
395 sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
399 th->dereference_tsd();
403 // signal the thread's sleep semaphore
406 th->dereference_tsd();
410 th = _dead.remove_first();
414 th = _running.remove_first();
417 // signal the thread's sleep semaphore
419 sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
422 th->dereference_tsd();
428 th->dereference_tsd();
434 th = _running.remove_first();
445 // make this static to the class
446 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
448 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
450 Thread *myself = (Thread *)parm;
458 // Set myself into thread specific storage
459 // This will allow code to get its own Thread
460 Thread::setCurrent(myself);
462 ThreadPool *pool = (ThreadPool *)myself->get_parm();
468 if(pool->_dying.value())
471 return((PEGASUS_THREAD_RETURN)0);
474 Semaphore *sleep_sem = 0;
475 Semaphore *blocking_sem = 0;
477 struct timeval *deadlock_timer = 0;
481 sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
482 myself->dereference_tsd();
483 deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
484 myself->dereference_tsd();
490 return((PEGASUS_THREAD_RETURN)0);
493 if(sleep_sem == 0 || deadlock_timer == 0)
496 return((PEGASUS_THREAD_RETURN)0);
503 if(pool->_dying.value())
508 return((PEGASUS_THREAD_RETURN)0);
515 catch(IPCException& )
518 return((PEGASUS_THREAD_RETURN)0);
521 // when we awaken we reside on the running queue, not the pool queue
522 if(pool->_dying.value())
525 return((PEGASUS_THREAD_RETURN)0);
529 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
534 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
535 myself->reference_tsd("work func");
536 myself->dereference_tsd();
537 parm = myself->reference_tsd("work parm");
538 myself->dereference_tsd();
539 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
540 myself->dereference_tsd();
543 catch(IPCException &)
546 return((PEGASUS_THREAD_RETURN)0);
556 (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
561 gettimeofday(deadlock_timer, NULL);
565 timed_mutex(&(pool->_monitor), 1000);
566 if(pool->_dying.value())
575 return((PEGASUS_THREAD_RETURN)0);
578 // put myself back onto the available list
581 timed_mutex(&(pool->_monitor), 1000);
582 if(pool->_dying.value() == 0)
584 gettimeofday(deadlock_timer, NULL);
585 if( blocking_sem != 0 )
586 blocking_sem->signal();
588 // If we are not on _running then ~ThreadPool has removed
589 // us and now "owns" our pointer.
590 if( pool->_running.remove((void *)myself) != 0 )
591 pool->_pool.insert_first(myself);
594 return((PEGASUS_THREAD_RETURN)0);
601 return((PEGASUS_THREAD_RETURN)0);
607 return((PEGASUS_THREAD_RETURN)0);
612 // TODO: Why is this needed? Why not just continue?
613 // wait to be awakend by the thread pool destructor
616 myself->test_cancel();
619 return((PEGASUS_THREAD_RETURN)0);
622 void ThreadPool::allocate_and_awaken(void *parm,
623 PEGASUS_THREAD_RETURN \
624 (PEGASUS_THREAD_CDECL *work)(void *),
629 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
630 struct timeval start;
631 gettimeofday(&start, NULL);
636 timed_mutex(&(this->_monitor), 1000);
641 th = _pool.remove_first();
650 // wait for the right interval and try again
651 while (th == 0 && _dying.value() < 1)
653 // will throw an IPCException&
654 _check_deadlock(&start) ;
656 if(_max_threads == 0 || _current_threads < _max_threads)
664 timed_mutex(&(this->_monitor), 1000);
669 th = _pool.remove_first();
677 if(_dying.value() < 1)
679 // initialize the thread data with the work function and parameters
680 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
681 "Initializing thread with work function and parameters: parm = %p",
684 th->delete_tsd("work func");
685 th->put_tsd("work func", NULL,
686 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
688 th->delete_tsd("work parm");
689 th->put_tsd("work parm", NULL, sizeof(void *), parm);
690 th->delete_tsd("blocking sem");
692 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
695 timed_mutex(&(this->_monitor), 1000);
704 // put the thread on the running list
707 _running.insert_first(th);
708 // signal the thread's sleep semaphore to awaken it
709 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
713 th->dereference_tsd();
717 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
719 th->dereference_tsd();
738 // caller is responsible for only calling this routine during slack periods
739 // but should call it at least once per _deadlock_detect with the running q
740 // and at least once per _deallocate_wait for the pool q
742 Uint32 ThreadPool::kill_dead_threads(void)
746 gettimeofday(&now, NULL);
749 // first go thread the dead q and clean it up as much as possible
752 timed_mutex(&(this->_monitor), 1000);
758 while(_dead.count() > 0 && _dying.value() == 0 )
760 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread");
761 Thread *dead = _dead.remove_first();
774 DQueue<Thread> * map[2] =
780 DQueue<Thread> *q = 0;
784 #ifdef PEGASUS_DISABLE_KILLING_HUNG_THREADS
785 // This change prevents the thread pool from killing "hung" threads.
786 // The definition of a "hung" thread is one that has been on the run queue
787 // for longer than the time interval set when the thread pool was created.
788 // Cancelling "hung" threads has proven to be problematic.
790 // With this change the thread pool will not cancel "hung" threads. This
791 // may prevent a crash depending upon the state of the "hung" thread. In
792 // the case that the thread is actually hung, this change causes the
793 // thread resources not to be reclaimed.
795 // Idle threads, those that have not executed a routine for a time
796 // interval, continue to be destroyed. This is normal and should not
797 // cause any problems.
805 try_mutex(&(this->_monitor));
829 struct timeval dt = { 0, 0 };
837 dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
847 memcpy(&dt, dtp, sizeof(struct timeval));
849 th->dereference_tsd();
850 struct timeval deadlock_timeout;
854 too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));
858 too_long = check_time(&dt, get_deadlock_detect(&deadlock_timeout));
861 if( true == too_long)
863 // if we are deallocating from the pool, escape if we are
864 // down to the minimum thread count
866 if( _current_threads.value() < (Uint32)_min_threads )
876 // we are killing a hung thread and we will drop below the
877 // minimum. create another thread to make up for the one
878 // we are about to kill
883 th = q->remove_no_lock((void *)th);
889 th->delete_tsd("work func");
890 th->put_tsd("work func", NULL,
891 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
892 (void *)&_undertaker);
893 th->delete_tsd("work parm");
894 th->put_tsd("work parm", NULL, sizeof(void *), th);
896 // signal the thread's sleep semaphore to awaken it
897 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
902 th->dereference_tsd();
907 th->dereference_tsd();
908 _dead.insert_first(th);
914 // deadlocked threads
915 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Killing a deadlocked thread");
930 while (needed.value() > 0) {
931 _link_pool(_init_thread());
939 Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
941 // never time out if the interval is zero
942 if(interval && interval->tv_sec == 0 && interval->tv_usec == 0)
945 struct timeval now = {0,0}, finish = {0,0}, remaining = {0,0};
947 pegasus_gettimeofday(&now);
948 /* remove valgrind error */
949 pegasus_gettimeofday(&remaining);
952 finish.tv_sec = start->tv_sec + interval->tv_sec;
953 usec = start->tv_usec + interval->tv_usec;
954 finish.tv_sec += (usec / 1000000);
956 finish.tv_usec = usec;
958 if ( timeval_subtract(&remaining, &finish, &now) )
964 PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
966 exit_thread((PEGASUS_THREAD_RETURN)1);
967 return (PEGASUS_THREAD_RETURN)1;
971 void ThreadPool::_sleep_sem_del(void *p)
975 delete (Semaphore *)p;
979 void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
981 if (true == check_time(start, &_deadlock_detect))
982 throw Deadlock(pegasus_thread_self());
987 Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
989 return(check_time(start, &_deadlock_detect));
992 Boolean ThreadPool::_check_dealloc(struct timeval *start)
994 return(check_time(start, &_deallocate_wait));
997 Thread *ThreadPool::_init_thread(void) throw(IPCException)
999 Thread *th = (Thread *) new Thread(_loop, this, false);
1000 // allocate a sleep semaphore and pass it in the thread context
1001 // initial count is zero, loop function will sleep until
1002 // we signal the semaphore
1003 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
1004 th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
1006 struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
1007 pegasus_gettimeofday(dldt);
1009 th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
1010 // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
1019 void ThreadPool::_link_pool(Thread *th) throw(IPCException)
1022 throw NullPointer();
1026 timed_mutex(&(this->_monitor), 1000);
1034 _pool.insert_first(th);
1043 PEGASUS_NAMESPACE_END