After building a few tests I fixed a few bugs as well as added the ability to Group...
authorJim Carroll <thecarrolls@jiminger.com>
Mon, 20 Jun 2011 01:29:59 +0000 (21:29 -0400)
committerJim Carroll <thecarrolls@jiminger.com>
Thu, 23 Jun 2011 14:15:27 +0000 (10:15 -0400)
xbmc/threads/Event.cpp
xbmc/threads/Event.h
xbmc/threads/Interruptible.h

index 938e32c..f1f01a1 100644 (file)
 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
 */
 
-#include "Event.h"
-#include "utils/TimeUtils.h"
-#include "PlatformDefs.h"
+#include <stdarg.h>
 
-//////////////////////////////////////////////////////////////////////
-// Construction/Destruction
-//////////////////////////////////////////////////////////////////////
+#include "Event.h"
 
 void CEvent::Interrupt() 
 { 
@@ -36,6 +32,7 @@ void CEvent::Interrupt()
 bool CEvent::Wait()
 {
   CSingleLock lock(mutex);
+  numWaits++;
   interrupted = false;
   Guard g(interruptible ? this : NULL);
 
@@ -43,7 +40,9 @@ bool CEvent::Wait()
     condVar.wait(mutex);
 
   bool ret = signaled;
-  if (!manualReset)
+
+  numWaits--;
+  if (!manualReset && numWaits == 0)
     signaled = false;
 
   return ret;
@@ -51,15 +50,19 @@ bool CEvent::Wait()
 
 bool CEvent::WaitMSec(unsigned int milliSeconds)
 {
+
   CSingleLock lock(mutex);
+  numWaits++;
   interrupted = false;
   Guard g(interruptible ? this : NULL);
 
-  unsigned int startTime = CTimeUtils::GetTimeMS();
-  unsigned int remainingTime = milliSeconds;
+  long remainingTime = (long)milliSeconds;
+
+  boost::system_time const timeout=boost::get_system_time() + boost::posix_time::milliseconds(milliSeconds);
+
   while(!signaled && !interrupted)
   {
-    XbmcThreads::ConditionVariable::TimedWaitResponse resp = condVar.wait(mutex,remainingTime);
+    XbmcThreads::ConditionVariable::TimedWaitResponse resp = condVar.wait(mutex,(unsigned int)remainingTime);
 
     if (signaled)
       return true;
@@ -67,17 +70,136 @@ bool CEvent::WaitMSec(unsigned int milliSeconds)
     if (resp == XbmcThreads::ConditionVariable::TW_TIMEDOUT)
       return false;
 
-    unsigned int elapsedTimeMillis = CTimeUtils::GetTimeMS() - startTime;
-    if (elapsedTimeMillis > milliSeconds)
-      return false;
+    boost::posix_time::time_duration diff = timeout - boost::get_system_time();
+
+    remainingTime = diff.total_milliseconds();
 
-    remainingTime = milliSeconds - elapsedTimeMillis;
+    if (remainingTime <= 0)
+      return false;
   }
 
   bool ret = signaled;
-  if (!manualReset)
+
+  numWaits--;
+  if (!manualReset && numWaits == 0)
     signaled = false;
 
   return ret;
 }
 
+void CEvent::groupSet()
+{
+  if (groups)
+  {
+    for (std::vector<XbmcThreads::CEventGroup*>::iterator iter = groups->begin(); 
+         iter != groups->end(); iter++)
+      (*iter)->Set(this);
+  }
+}
+
+void CEvent::addGroup(XbmcThreads::CEventGroup* group)
+{
+  CSingleLock lock(mutex);
+  if (groups == NULL)
+    groups = new std::vector<XbmcThreads::CEventGroup*>();
+
+  groups->push_back(group);
+}
+
+void CEvent::removeGroup(XbmcThreads::CEventGroup* group)
+{
+  if (groups)
+  {
+    CSingleLock lock(mutex);
+    for (std::vector<XbmcThreads::CEventGroup*>::iterator iter = groups->begin(); iter != groups->end(); iter++)
+    {
+      if ((*iter) == group)
+      {
+        groups->erase(iter);
+        break;
+      }
+    }
+
+    if (groups->size() <= 0)
+    {
+      delete groups;
+      groups = NULL;
+    }
+  }
+}
+
+namespace XbmcThreads
+{
+  CEventGroup::CEventGroup(int num, CEvent* v1, ...) : signaled(NULL), numWaits(0)
+  {
+    va_list ap;
+
+    va_start(ap, v1);
+    events.push_back(v1);
+    num--; // account for v1
+    for (;num > 0; num--)
+      events.push_back(va_arg(ap,CEvent*));
+    va_end(ap);
+
+    // we preping for a wait, so we need to set the group value on
+    // all of the CEvents. 
+    for (std::vector<CEvent*>::iterator iter = events.begin();
+         iter != events.end(); iter++)
+      (*iter)->addGroup(this);
+  }
+
+  CEventGroup::CEventGroup(CEvent* v1, ...) : signaled(NULL), numWaits(0)
+  {
+    va_list ap;
+
+    va_start(ap, v1);
+    events.push_back(v1);
+    bool done = false;
+    while(!done)
+    {
+      CEvent* cur = va_arg(ap,CEvent*);
+      if (cur)
+        events.push_back(cur);
+      else
+        done = true;
+    }
+    va_end(ap);
+
+    // we preping for a wait, so we need to set the group value on
+    // all of the CEvents. 
+    for (std::vector<CEvent*>::iterator iter = events.begin();
+         iter != events.end(); iter++)
+      (*iter)->addGroup(this);
+  }
+
+  CEvent* CEventGroup::wait()
+  {
+    CSingleLock lock(mutex);
+    numWaits++;
+    while (!signaled)
+      condVar.wait(mutex);
+
+    CEvent* ret = signaled;
+    numWaits--;
+    if (numWaits == 0)
+      signaled = NULL;
+
+    return ret;
+  }
+
+  CEventGroup::~CEventGroup()
+  {
+    // we preping for a wait, so we need to set the group value on
+    // all of the CEvents. 
+    for (std::vector<CEvent*>::iterator iter = events.begin();
+         iter != events.end(); iter++)
+      (*iter)->removeGroup(this);
+  }
+
+  void CEventGroup::Set(CEvent* child)
+  {
+    CSingleLock lock(mutex);
+    signaled = child;
+    condVar.notifyAll();
+  }
+}
index 92ee267..ff995e1 100644 (file)
 
 #pragma once
 
+#include <vector>
+
 #include "threads/Condition.h"
 #include "threads/Interruptible.h"
 
+// forward declare the CEventGroup
+namespace XbmcThreads
+{
+  class CEventGroup;
+}
+
+
 /**
  * This is an Event class built from a ConditionVariable. The Event adds the state
  * that the condition is gating as well as the mutex/lock.
@@ -39,18 +48,30 @@ class CEvent : public XbmcThreads::IInterruptible
   bool signaled;
   bool interrupted;
   bool interruptible;
+
+  unsigned int numWaits;
+
+  std::vector<XbmcThreads::CEventGroup*> * groups;
+
   XbmcThreads::ConditionVariable condVar;
   CCriticalSection mutex;
 
   // block the ability to copy
   inline CEvent& operator=(const CEvent& src) { return *this; }
   inline CEvent(const CEvent& other) {}
+
+  friend class XbmcThreads::CEventGroup;
+
+  void groupSet();
+  void addGroup(XbmcThreads::CEventGroup* group);
+  void removeGroup(XbmcThreads::CEventGroup* group);
 public:
 
   inline CEvent(bool manual = false, bool interruptible_ = false) : 
-    manualReset(manual), signaled(false), interrupted(false), interruptible(interruptible_) {}
+    manualReset(manual), signaled(false), interrupted(false), 
+    interruptible(interruptible_), numWaits(0), groups(NULL) {}
   inline void Reset() { CSingleLock lock(mutex); signaled = false; }
-  inline void Set() { CSingleLock lock(mutex); signaled = true; condVar.notifyAll(); }
+  inline void Set() { CSingleLock lock(mutex); signaled = true; condVar.notifyAll(); groupSet(); }
 
   virtual void Interrupt();
   inline bool wasInterrupted() { CSingleLock lock(mutex); return interrupted; }
@@ -74,3 +95,48 @@ public:
   bool Wait();
 };
 
+namespace XbmcThreads
+{
+  /**
+   * CEventGroup is a means of grouping CEvents to wait on them together.
+   * It is equivalent to WaitOnMultipleObject with that returns when "any"
+   * in the group signaled.
+   */
+  class CEventGroup
+  {
+    std::vector<CEvent*> events;
+    XbmcThreads::ConditionVariable condVar;
+    CCriticalSection mutex;
+    CEvent* signaled;
+
+    unsigned int numWaits;
+    void Set(CEvent* child);
+
+    friend class ::CEvent;
+  public:
+
+    /**
+     * Create a CEventGroup from a number of CEvents. num is the number
+     *  of Events that follow. E.g.:
+     *
+     *  CEventGroup g(3, event1, event2, event3);
+     */
+    CEventGroup(int num, CEvent* v1, ...);
+
+    /**
+     * Create a CEventGroup from a number of CEvents. The parameters
+     *  should form a NULL terminated list of CEvent*'s
+     *
+     *  CEventGroup g(event1, event2, event3, NULL);
+     */
+    CEventGroup(CEvent* v1, ...);
+    ~CEventGroup();
+
+    /**
+     * This will block until any one of the CEvents in the group are
+     * signaled at which point a pointer to that CEvents will be 
+     * returned.
+     */
+    CEvent* wait();
+  };
+}
index fb1036a..f5de4ea 100644 (file)
@@ -37,7 +37,7 @@ namespace XbmcThreads
    *  public:
    *     virtual void Interrupt();
    *
-   *     void blockingMethodCall() { XbmcThreads::IInterruptible::Guard g(*this); ... do blocking; }
+   *     void blockingMethodCall() { XbmcThreads::IInterruptible::Guard g(this); ... do blocking; }
    *  };
    *
    * See CEvent as an example
@@ -48,13 +48,13 @@ namespace XbmcThreads
     virtual void Interrupt() = 0;
 
     /**
-     * Calling InterruptAll will invoke interrup on all IInterruptibles
+     * Calling InterruptAll will invoke interrupt on all IInterruptibles
      *  currently in a wait state
      */
     static void InterruptAll();
 
     /**
-     * Calling InterruptThreadSpecific will invoke interrup on all IInterruptibles
+     * Calling InterruptThreadSpecific will invoke interrupt on all IInterruptibles
      *  currently in a wait state in this thread only.
      */
     static void InterruptThreadSpecific();