};
-CFileCache::CFileCache() : m_seekEvent(false,true)
+CFileCache::CFileCache()
{
m_bDeleteCache = true;
m_nSeekResult = 0;
m_pCache->EndOfInput();
// The thread event will now also cause the wait of an event to return a false.
- if (m_seekEvent.Wait())
+ XbmcThreads::CEventGroup group(&m_seekEvent, getStopEvent(), NULL);
+ if (group.wait() == &m_seekEvent)
{
m_pCache->ClearEndOfInput();
m_seekEvent.Set(); // hack so that later we realize seek is needed
+++ /dev/null
-/*
- * Copyright (C) 2005-2011 Team XBMC
- * http://www.xbmc.org
- *
- * This Program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2, or (at your option)
- * any later version.
- *
- * This Program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with XBMC; see the file COPYING. If not, write to
- * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- */
-
-#include "threads/Condition.h"
-
-namespace XbmcThreads
-{
-
- ConditionVariable::TimedWaitResponse ConditionVariable::wait(CSingleLock& lock, int milliseconds)
- {
- ConditionVariable::TimedWaitResponse ret = TW_OK;
- try
- {
- ret = (impl.timed_wait(lock, boost::posix_time::milliseconds(milliseconds))) ? TW_OK : TW_TIMEDOUT;
- }
- catch (boost::thread_interrupted )
- {
- ret = TW_INTERRUPTED;
- }
- catch (...)
- {
- ret = TW_ERROR;
- }
- return ret;
- }
-
- ConditionVariable::TimedWaitResponse ConditionVariable::wait(CCriticalSection& mutex, int milliseconds)
- {
- ConditionVariable::TimedWaitResponse ret = TW_OK;
- try
- {
- ret = (impl.timed_wait(mutex, boost::posix_time::milliseconds(milliseconds))) ? TW_OK : TW_TIMEDOUT;
- }
- catch (boost::thread_interrupted )
- {
- ret = TW_INTERRUPTED;
- }
- catch (...)
- {
- ret = TW_ERROR;
- }
- return ret;
- }
-}
-
namespace XbmcThreads
{
/**
- * This is a thin wrapper around boost::condition_variable (I
- * would prefer to use it directly but ...) and will be replacing
- * existing WaitForSingleObject, SDL_cond, etc.
+ * This is a thin wrapper around boost::condition_variable. It is subject
+ * to "spurious returns" as it is built on boost which is built on posix
+ * on many of our platforms.
*/
class ConditionVariable
{
enum TimedWaitResponse { TW_OK = 0, TW_TIMEDOUT = 1, TW_INTERRUPTED=-1, TW_ERROR=-2 };
- inline void wait(CSingleLock& lock) { impl.wait(lock); }
- inline void wait(CCriticalSection& mutex) { impl.wait(mutex); }
+ template<typename L> inline void wait(L& lock) { impl.wait(lock); }
- TimedWaitResponse wait(CSingleLock& lock, int milliseconds);
- TimedWaitResponse wait(CCriticalSection& mutex, int milliseconds);
+ template<typename L> inline TimedWaitResponse wait(L& lock, int milliseconds)
+ {
+ ConditionVariable::TimedWaitResponse ret = TW_OK;
+ try { ret = (impl.timed_wait(lock, boost::posix_time::milliseconds(milliseconds))) ? TW_OK : TW_TIMEDOUT; }
+ catch (boost::thread_interrupted ) { ret = TW_INTERRUPTED; }
+ catch (...) { ret = TW_ERROR; }
+ return ret;
+ }
inline void notifyAll() { impl.notify_all(); }
inline void notify() { impl.notify_one(); }
};
+
+ /**
+ * This is a condition variable along with its predicate. This allows the use of a
+ * condition variable without the spurious returns since the state being monitored
+ * is also part of the condition.
+ *
+ * L should be either a CSingleLock or a CCriticalSection.
+ *
+ * The requirements on P are that it can act as a predicate (that is, I can use
+ * it in an 'while(!predicate){...}' where 'predicate' is of type 'P').
+ */
+ template <typename L, typename P> class TightConditionVariable
+ {
+ ConditionVariable cond;
+ P predicate;
+ public:
+ inline TightConditionVariable(P predicate_) : predicate(predicate_) {}
+ inline void wait(L& lock) { while(!predicate) cond.wait(lock); }
+
+ inline ConditionVariable::TimedWaitResponse wait(L& lock, int milliseconds)
+ {
+ ConditionVariable::TimedWaitResponse ret = ConditionVariable::TW_OK;
+ boost::system_time const timeout=boost::get_system_time() + boost::posix_time::milliseconds(milliseconds);
+ while ((!predicate) && ret != ConditionVariable::TW_TIMEDOUT)
+ {
+ ret = cond.wait(lock,milliseconds);
+
+ if (!predicate && boost::get_system_time() > timeout)
+ ret = ConditionVariable::TW_TIMEDOUT;
+ }
+ return ret;
+ }
+
+ inline void notifyAll() { cond.notifyAll(); }
+ inline void notify() { cond.notify(); }
+ };
}
for (unsigned int i = 0; i < restoreCount; i++)
lock();
}
+
+ inline unsigned int getCount() { return count; }
+
+ inline L& getLockable() { return mutex; }
};
/**
#include "Event.h"
-class CountGuard
-{
- unsigned int& count;
-public:
- inline CountGuard(unsigned int& count_) : count(count_) { count++; }
- inline ~CountGuard() { count--; }
-};
-
-void CEvent::Interrupt()
-{
- CSingleLock lock(mutex);
- interrupted = true;
- condVar.notifyAll();
-}
-
-bool CEvent::Wait()
-{
- CSingleLock lock(mutex);
- { CountGuard cg(numWaits);
- interrupted = false;
- Guard g(interruptible ? this : NULL);
-
- while (!signaled && !interrupted)
- condVar.wait(mutex);
- }
-
- bool ret = signaled;
-
- if (!manualReset && numWaits == 0)
- signaled = false;
- return ret;
-}
-
-bool CEvent::WaitMSec(unsigned int milliSeconds)
-{
-
- CSingleLock lock(mutex);
- { CountGuard cg(numWaits);
- interrupted = false;
- Guard g(interruptible ? this : NULL);
-
- long remainingTime = (long)milliSeconds;
-
- boost::system_time const timeout=boost::get_system_time() + boost::posix_time::milliseconds(milliSeconds);
-
- while(!signaled && !interrupted)
- {
- XbmcThreads::ConditionVariable::TimedWaitResponse resp = condVar.wait(mutex,(unsigned int)remainingTime);
-
- if (signaled)
- return true;
-
- if (resp == XbmcThreads::ConditionVariable::TW_TIMEDOUT)
- return false;
-
- boost::posix_time::time_duration diff = timeout - boost::get_system_time();
-
- remainingTime = diff.total_milliseconds();
-
- if (remainingTime <= 0)
- return false;
- }
- }
-
- bool ret = signaled;
- if (!manualReset && numWaits == 0)
- signaled = false;
-
- return ret;
-}
-
void CEvent::groupSet()
{
if (groups)
namespace XbmcThreads
{
- CEventGroup::CEventGroup(int num, CEvent* v1, ...) : signaled(NULL), numWaits(0)
+ CEventGroup::CEventGroup(int num, CEvent* v1, ...) : signaled(NULL), condVar(signaled), numWaits(0)
{
va_list ap;
(*iter)->addGroup(this);
}
- CEventGroup::CEventGroup(CEvent* v1, ...) : signaled(NULL), numWaits(0)
+ CEventGroup::CEventGroup(CEvent* v1, ...) : signaled(NULL), condVar(signaled), numWaits(0)
{
va_list ap;
(*iter)->addGroup(this);
}
- CEvent* CEventGroup::wait()
- {
- CSingleLock lock(mutex);
- { CountGuard cg(numWaits);
- while (!signaled)
- condVar.wait(mutex);
- }
- CEvent* ret = signaled;
- if (numWaits == 0)
- signaled = NULL;
-
- return ret;
- }
-
- CEvent* CEventGroup::wait(unsigned int milliSeconds)
- {
- CSingleLock lock(mutex);
- { CountGuard cg(numWaits);
- long remainingTime = (long)milliSeconds;
-
- boost::system_time const timeout=boost::get_system_time() + boost::posix_time::milliseconds(milliSeconds);
-
- while(!signaled)
- {
- XbmcThreads::ConditionVariable::TimedWaitResponse resp = condVar.wait(mutex,(unsigned int)remainingTime);
-
- if (signaled)
- return signaled;
-
- if (resp == XbmcThreads::ConditionVariable::TW_TIMEDOUT)
- return NULL;
-
- boost::posix_time::time_duration diff = timeout - boost::get_system_time();
-
- remainingTime = diff.total_milliseconds();
-
- if (remainingTime <= 0)
- return NULL;
- }
- }
-
- CEvent* ret = signaled;
- if (numWaits == 0)
- signaled = NULL;
-
- return ret;
- }
-
CEventGroup::~CEventGroup()
{
// we preping for a wait, so we need to set the group value on
(*iter)->removeGroup(this);
}
- void CEventGroup::Set(CEvent* child)
- {
- CSingleLock lock(mutex);
- signaled = child;
- condVar.notifyAll();
- }
}
#include <vector>
#include "threads/Condition.h"
-#include "threads/Interruptible.h"
// forward declare the CEventGroup
namespace XbmcThreads
*
* This class manages 'spurious returns' from the condition variable.
*/
-class CEvent : public XbmcThreads::IInterruptible
+class CEvent
{
bool manualReset;
bool signaled;
- bool interrupted;
- bool interruptible;
-
unsigned int numWaits;
std::vector<XbmcThreads::CEventGroup*> * groups;
- XbmcThreads::ConditionVariable condVar;
+ /**
+ * To satisfy the TightConditionVariable requirements and allow the
+ * predicate being monitored to include both the signaled and interrupted
+ * states.
+ */
+ XbmcThreads::TightConditionVariable<CCriticalSection,bool&> condVar;
CCriticalSection mutex;
- // block the ability to copy
- inline CEvent& operator=(const CEvent& src) { return *this; }
- inline CEvent(const CEvent& other) {}
-
friend class XbmcThreads::CEventGroup;
void groupSet();
void addGroup(XbmcThreads::CEventGroup* group);
void removeGroup(XbmcThreads::CEventGroup* group);
+
+ // helper for the two wait methods
+ inline bool prepReturn() { bool ret = signaled; if (!manualReset && numWaits == 0) signaled = false; return ret; }
+
+ // block the ability to copy
+ inline CEvent& operator=(const CEvent& src) { return *this; }
+ inline CEvent(const CEvent& other): condVar(signaled) {}
+
public:
+ inline CEvent(bool manual = false) :
+ manualReset(manual), signaled(false), numWaits(0), groups(NULL), condVar(signaled) {}
- inline CEvent(bool manual = false, bool interruptible_ = false) :
- manualReset(manual), signaled(false), interrupted(false),
- interruptible(interruptible_), numWaits(0), groups(NULL) {}
inline void Reset() { CSingleLock lock(mutex); signaled = false; }
inline void Set() { CSingleLock lock(mutex); signaled = true; condVar.notifyAll(); groupSet(); }
- virtual void Interrupt();
- inline bool wasInterrupted() { CSingleLock lock(mutex); return interrupted; }
-
/**
* This will wait up to 'milliSeconds' milliseconds for the Event
* to be triggered. The method will return 'true' if the Event
* use 'wasInterrupted()' call prior to any further call to a
* Wait* method.
*/
- bool WaitMSec(unsigned int milliSeconds);
+ inline bool WaitMSec(unsigned int milliSeconds)
+ { CSingleLock lock(mutex); numWaits++; condVar.wait(mutex,milliSeconds); numWaits--; return prepReturn(); }
/**
* This will wait for the Event to be triggered. The method will return
* it will return false. To determine if it was interrupted you can
* use 'wasInterrupted()' call prior to any further call to a Wait* method.
*/
- bool Wait();
+ inline bool Wait()
+ { CSingleLock lock(mutex); numWaits++; condVar.wait(mutex); numWaits--; return prepReturn(); }
+
};
namespace XbmcThreads
class CEventGroup
{
std::vector<CEvent*> events;
- XbmcThreads::ConditionVariable condVar;
- CCriticalSection mutex;
CEvent* signaled;
+ XbmcThreads::TightConditionVariable<CCriticalSection,CEvent*&> condVar;
+ CCriticalSection mutex;
unsigned int numWaits;
- void Set(CEvent* child);
+
+ inline void Set(CEvent* child) { CSingleLock lock(mutex); signaled = child; condVar.notifyAll(); }
friend class ::CEvent;
+
+ inline CEvent* prepReturn() { CEvent* ret = signaled; if (numWaits == 0) signaled = NULL; return ret; }
+
public:
/**
* signaled at which point a pointer to that CEvents will be
* returned.
*/
- CEvent* wait();
+ inline CEvent* wait() { CSingleLock lock(mutex); numWaits++; condVar.wait(mutex); numWaits--; return prepReturn(); }
/**
* This will block until any one of the CEvents in the group are
* it will return a pointer to that CEvent, otherwise it will return
* NULL.
*/
- CEvent* wait(unsigned int milliseconds);
+ inline CEvent* wait(unsigned int milliseconds) { CSingleLock lock(mutex); numWaits++; condVar.wait(mutex,milliseconds); numWaits--; return prepReturn(); }
};
}
+++ /dev/null
-/*
- * Copyright (C) 2005-2011 Team XBMC
- * http://www.xbmc.org
- *
- * This Program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2, or (at your option)
- * any later version.
- *
- * This Program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with XBMC; see the file COPYING. If not, write to
- * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- */
-
-
-#include <vector>
-
-#include "threads/Interruptible.h"
-#include "threads/ThreadLocal.h"
-#include "threads/SingleLock.h"
-
-namespace XbmcThreads
-{
- static CCriticalSection staticMutexForLockingTheListOfIInterruptiblesForCallingInterrupt;
- static ThreadLocal<std::vector<IInterruptible*> > threadSpecificInterruptibles;
- static std::vector<IInterruptible*> allInterruptibles;
-
- static void callInterrupt(std::vector<IInterruptible*> * interruptibles)
- {
- if (interruptibles != NULL)
- {
- // copy the list in case the Interrupt call modifies it.
- std::vector<IInterruptible*> list(interruptibles->size());
- std::vector<IInterruptible*>::iterator iter;
-
- {
- CSingleLock lock(staticMutexForLockingTheListOfIInterruptiblesForCallingInterrupt);
- list = *interruptibles;
- }
-
- for (iter=list.begin(); iter != list.end(); iter++)
- (*iter)->Interrupt();
- }
- }
-
- void IInterruptible::InterruptAll()
- {
- callInterrupt(&allInterruptibles);
- }
-
- void IInterruptible::InterruptThreadSpecific()
- {
- callInterrupt(threadSpecificInterruptibles.get());
- }
-
- void IInterruptible::enteringWaitState()
- {
- std::vector<IInterruptible*> * cur = threadSpecificInterruptibles.get();
- if (cur == NULL)
- {
- cur = new std::vector<IInterruptible*>();
- threadSpecificInterruptibles.set(cur);
- }
- cur->push_back(this);
-
- CSingleLock lock(staticMutexForLockingTheListOfIInterruptiblesForCallingInterrupt);
- allInterruptibles.push_back(this);
- }
-
- void static removeIt(std::vector<IInterruptible*> * list, IInterruptible* val)
- {
- std::vector<IInterruptible*>::iterator iter;
- for (iter = list->begin(); iter != list->end(); iter++)
- {
- if ((*iter) == val)
- {
- list->erase(iter);
- break;
- }
- }
- }
-
- void IInterruptible::leavingWaitState()
- {
- std::vector<IInterruptible*> * cur = threadSpecificInterruptibles.get();
- if (cur != NULL)
- removeIt(cur,this);
- CSingleLock lock(staticMutexForLockingTheListOfIInterruptiblesForCallingInterrupt);
- removeIt(&allInterruptibles,this);
- }
-
-}
+++ /dev/null
-/*
- * Copyright (C) 2005-2011 Team XBMC
- * http://www.xbmc.org
- *
- * This Program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2, or (at your option)
- * any later version.
- *
- * This Program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with XBMC; see the file COPYING. If not, write to
- * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- */
-
-#pragma once
-
-namespace XbmcThreads
-{
- /**
- * This interface is meant to be implemented by operations that
- * block and can be interrupted.
- *
- * The implementer of an IInterruptible must not only handle the Interrupt
- * callback but also register and unregister the Interruptable before entering
- * and after leaving a wait state. The Guard class is a helper for the
- * implementers to do this. As an example:
- *
- * class MyInterruptible : public IInterruptible
- * {
- * public:
- * virtual void Interrupt();
- *
- * void blockingMethodCall() { XbmcThreads::IInterruptible::Guard g(this); ... do blocking; }
- * };
- *
- * See CEvent as an example
- */
- class IInterruptible
- {
- public:
- virtual void Interrupt() = 0;
-
- /**
- * Calling InterruptAll will invoke interrupt on all IInterruptibles
- * currently in a wait state
- */
- static void InterruptAll();
-
- /**
- * Calling InterruptThreadSpecific will invoke interrupt on all IInterruptibles
- * currently in a wait state in this thread only.
- */
- static void InterruptThreadSpecific();
-
- protected:
-
- void enteringWaitState();
- void leavingWaitState();
-
- class Guard
- {
- IInterruptible* interruptible;
- public:
- inline Guard(IInterruptible* pinterruptible) : interruptible(pinterruptible) { if (pinterruptible) pinterruptible->enteringWaitState(); }
- inline ~Guard() { if (interruptible) interruptible->leavingWaitState(); }
- };
- };
-}
-
SRCS=Atomics.cpp \
- Condition.cpp \
Event.cpp \
- Interruptible.cpp \
LockFree.cpp \
Thread.cpp \
ThreadLocal.cpp \
#include "utils/log.h"
#include "utils/TimeUtils.h"
#include "threads/ThreadLocal.h"
-#include "threads/Interruptible.h"
static XbmcThreads::ThreadLocal<CThread> currentThread;
{
curThread->m_bStop = TRUE;
curThread->m_StopEvent.Set();
- XbmcThreads::IInterruptible::InterruptThreadSpecific();
curThread->OnException();
if( curThread->IsAutoDelete() )
{
m_bStop = true;
m_StopEvent.Set();
- XbmcThreads::IInterruptible::InterruptThreadSpecific();
if (m_ThreadHandle && bWait)
{
WaitForThreadExit(INFINITE);
volatile bool m_bStop;
HANDLE m_ThreadHandle;
+ inline CEvent* getStopEvent() { return &m_StopEvent; }
+
private:
CStdString GetTypeName(void);
-SRCS=TestEvent.cpp
+SRCS= \
+ TestMain.cpp \
+ TestEvent.cpp \
+ TestSharedSection.cpp
+
LIB=threadTest.a
-include $(patsubst %.cpp,%.P,$(patsubst %.c,%.P,$(SRCS)))
testMain: $(LIB) ../threads.a
- $(CXX) $(CXXFLAGS) $(LDFLAGS) -o testMain $(LIB) ../threads.a -lboost_unit_test_framework -lboost_thread
+ $(CXX) $(CXXFLAGS) $(LDFLAGS) -o testMain $(OBJS) ../threads.a -lboost_unit_test_framework -lboost_thread
-#define BOOST_TEST_DYN_LINK
-#define BOOST_TEST_MODULE TestEvent
#include <boost/test/unit_test.hpp>
#include "threads/Event.h"
--- /dev/null
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_MODULE "ThreadingTest"
+#include <boost/test/unit_test.hpp>
+
--- /dev/null
+#include <boost/test/unit_test.hpp>
+
+#include "threads/SharedSection.h"
+#include "threads/SingleLock.h"
+
+#include <boost/thread/thread.hpp>
+#include <stdio.h>
+
+//=============================================================================
+// Helper classes
+//=============================================================================
+
+static void Sleep(unsigned int millis) { boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(millis)); }
+
+template<class L>
+class locker
+{
+ CSharedSection& sec;
+ unsigned int wait;
+public:
+ volatile bool haslock;
+
+ locker(CSharedSection& o, unsigned int waitTime = 0) : sec(o), wait(waitTime), haslock(false) {}
+
+ void operator()()
+ {
+ L lock(sec);
+ haslock = true;
+ if (wait)
+ Sleep(wait);
+ haslock = false;
+ }
+};
+
+BOOST_AUTO_TEST_CASE(TestCritSectionCase)
+{
+ CCriticalSection sec;
+
+ CSingleLock l1(sec);
+ CSingleLock l2(sec);
+}
+
+BOOST_AUTO_TEST_CASE(TestSharedSectionCase)
+{
+ CSharedSection sec;
+
+ CSharedLock l1(sec);
+ CSharedLock l2(sec);
+}
+
+BOOST_AUTO_TEST_CASE(TestGetSharedLockWhileTryingExclusiveLock)
+{
+ CSharedSection sec;
+
+ CSharedLock l1(sec); // get a shared lock
+
+ locker<CExclusiveLock> l2(sec);
+ boost::thread waitThread1(boost::ref(l2)); // try to get an exclusive lock
+ Sleep(10);
+ BOOST_CHECK(!l2.haslock); // this thread is waiting ...
+
+ // now try and get a SharedLock
+ locker<CSharedLock> l3(sec);
+ boost::thread waitThread3(boost::ref(l3)); // try to get an exclusive lock
+ Sleep(10);
+// BOOST_CHECK(l3.haslock);
+
+ // let it go
+ l1.Leave();
+ Sleep(10);
+}
+
+BOOST_AUTO_TEST_CASE(TestSharedSection2Case)
+{
+ CSharedSection sec;
+
+ locker<CSharedLock> l1(sec,20);
+
+ {
+ CSharedLock lock(sec);
+ boost::thread waitThread1(boost::ref(l1));
+
+ Sleep(10);
+ BOOST_CHECK(l1.haslock);
+
+ waitThread1.join();
+ }
+
+ locker<CSharedLock> l2(sec,20);
+ {
+ CExclusiveLock lock(sec);
+ boost::thread waitThread1(boost::ref(l2));
+
+ Sleep(5);
+ BOOST_CHECK(!l2.haslock);
+
+ lock.Leave();
+ Sleep(5);
+ BOOST_CHECK(l2.haslock);
+
+ waitThread1.join();
+ }
+}
+