Code::Blocks Forums

Developer forums (C::B DEVELOPMENT STRICTLY!) => Development => Topic started by: ollydbg on February 05, 2014, 07:02:57 AM

Title: Please help to review the code in cbThreadPool
Post by: ollydbg on February 05, 2014, 07:02:57 AM
By reviewing the code, I found two issue:
1. m_taskAdded don't go to false once it was set to true.

// this thread is finishing the task, and is going to be idle.
// if there is no task left, and the total threads is running is 0, then we have all task done
// otherwise, just put me to the idle mode by m_semaphore->Post()
bool cbThreadPool::WaitingThread()
{
 wxMutexLocker lock(m_Mutex);
 --m_workingThreads;

 if (m_workingThreads <= 0 && m_tasksQueue.empty())
 {
   // Sends cbEVT_THREADTASK_ALLDONE message only the real task is all done
   // FIXME (ollydbg#1#): Look at the variable m_taskAdded, it was initialized as false in pool's
   // constructor, and the only chance it was set true is in AddTask() function, I don't find any
   // code to reset this value to false, is that true logic?
   if (m_taskAdded)
   {
     // notify the owner that all tasks are done
     CodeBlocksEvent evt = CodeBlocksEvent(cbEVT_THREADTASK_ALLDONE, m_ID);
     wxPostEvent(m_pOwner, evt);
   }

See the FIXME section of mine.


2, cbThreadPool::AwakeNeeded run a lot of Post() call, but I don't think it is necessary, right?

inline void cbThreadPool::AwakeNeeded()
{
 if (m_concurrentThreads == -1)
   return;
 // TODO (ollydbg#1#): why?
 // I think this can be optimized, for example, the m_concurrentThreads = 3, and the m_tasksQueue.size() == 5
 // m_workingThreads = 2, do we need to call Post() function 5 times? I think 1 is enough.
 // if the m_workingThreads = 0, then I think 3 is enough.
 for (std::size_t i = 0; i < m_tasksQueue.size(); ++i)
   m_semaphore->Post();
}

See the TODO section above. The code was changed in

Revision: cc3a66de85dd9d73b664e298f35523b69f17ce81
Author: ceniza <ceniza@2a5c6006-c6dd-42ca-98ab-0921f2732cef>
Date: 2006-5-21 10:41:21
Message:
* Replaced the condition with a semaphore (just like the previous implementation).

git-svn-id: http://svn.code.sf.net/p/codeblocks/code/trunk@2480 2a5c6006-c6dd-42ca-98ab-0921f2732cef
----
Modified: src/sdk/cbthreadpool.cpp
Modified: src/sdk/cbthreadpool.h




I add many comments to those classes to help understand the logic, see the patch file below.
I would like to see some other devs or users comments on those issues, thanks.


bbfb0cce6e43a2d649c784b8b82158f38d19ded2
src/include/cbthreadpool.h | 48 +++++++++++++++++++++++++++-------------------
src/sdk/cbthreadpool.cpp   | 19 ++++++++++++------
2 files changed, 41 insertions(+), 26 deletions(-)

diff --git a/src/include/cbthreadpool.h b/src/include/cbthreadpool.h
index a06fb52..4a5ae17 100644
--- a/src/include/cbthreadpool.h
+++ b/src/include/cbthreadpool.h
@@ -70,7 +70,7 @@ class DLLIMPORT cbThreadPool
    /** Begin a batch process
      *
      * @note EVIL: Call it if you want to add all tasks first and get none executed yet.
-      * If you DON'T call it, taks will be executed as you add them (in fact it's what
+      * If you DON'T call it, tasks will be executed as you add them (in fact it's what
      * one would expect).
      */
    void BatchBegin();
@@ -101,7 +101,7 @@ class DLLIMPORT cbThreadPool
        T *operator -> () const throw();

      private:
-        void dispose();
+        void dispose(); //decrease the counter, and if it get 0, destroy both counter and ptr
    };

    /** A Worker Thread class.
@@ -115,7 +115,8 @@ class DLLIMPORT cbThreadPool
        /** cbWorkerThread ctor
          *
          * @param pool Thread Pool this Worker Thread belongs to
-          * @param semaphore Used to synchronise the Worker Threads
+          * @param semaphore Used to synchronize the Worker Threads, it is a reference to the CountedPtr
+          * object
          */
        cbWorkerThread(cbThreadPool *pool, CountedPtr<wxSemaphore> &semaphore);

@@ -137,9 +138,10 @@ class DLLIMPORT cbThreadPool
      private:
        bool m_abort;
        cbThreadPool *m_pPool;
+        // a counted semaphore shared with all the cbWorkerThread
        CountedPtr<wxSemaphore> m_semaphore;
        cbThreadedTask *m_pTask;
-        wxMutex m_taskMutex;
+        wxMutex m_taskMutex;// to protect the member variable accessing from multiply threads
    };

    typedef std::vector<cbWorkerThread *> WorkerThreadsArray;
@@ -170,26 +172,27 @@ class DLLIMPORT cbThreadPool

    typedef std::list<cbThreadedTaskElement> TasksQueue;

-    wxEvtHandler *m_pOwner;
-    int m_ID;
-    bool m_batching;
+    wxEvtHandler *m_pOwner; // events notification will send to this guy
+    int m_ID;           // id used to fill the ID field of the event
+    bool m_batching;    // whether in batch mode of adding tasks

-    int m_concurrentThreads; // current number of concurrent threads
+    int m_concurrentThreads; // current number of concurrent threads, this is the maximum value of the m_workingThreads
    unsigned int m_stackSize; // stack size for every threads
    int m_concurrentThreadsSchedule; // if we cannot apply the new value of concurrent threads, keep it here
-    WorkerThreadsArray m_threads; // the working threads are stored here
-    TasksQueue m_tasksQueue; // and the pending tasks here
+    WorkerThreadsArray m_threads; // the working threads(cbWorkerThread) are stored here
+    TasksQueue m_tasksQueue; // and the pending tasks (cbThreadedTaskElement) here
    bool m_taskAdded; // true if any task added

-    int m_workingThreads; // how many working threads are running a task
+    int m_workingThreads; // how many working threads are running tasks

-    mutable wxMutex m_Mutex; // we better be safe
+    mutable wxMutex m_Mutex; // we better be safe, protect the change of member variables

-    CountedPtr<wxSemaphore> m_semaphore; // used to synchronise the Worker Threads
+    CountedPtr<wxSemaphore> m_semaphore; // used to synchronize the Worker Threads, the counted value is that
+    // how many threads are sharing this semaphore

    void _SetConcurrentThreads(int concurrentThreads); // like SetConcurrentThreads, but non-thread safe
    void Broadcast(); // awakes all threads
-    void AwakeNeeded(); // awakes only a few threads
+    void AwakeNeeded(); // awakes only a few threads // TODO (ollydbg#1#): ?

  protected:
    friend class cbWorkerThread;
@@ -200,16 +203,18 @@ class DLLIMPORT cbThreadPool
      */
    cbThreadedTaskElement GetNextTask();

-    /// Mechanism for the threads to tell the Pool they're running
+    /// Mechanism for the threads to tell the Pool they're running, a thread is switch from the idle
+    /// mode to working mode. This is triggered by semaphore released somewhere
    void WorkingThread();

-    /** Mechanism for the threads to tell the Pool they're done and will wait
+    /** Mechanism for the threads to tell the Pool they're done and will go to idle, so we can assign
+      * another task to this thread.
      *
      * @return true if everything is OK, false if we should abort
      */
    bool WaitingThread();

-    /** Called by a Worker Thread to inform a task has finished
+    /** Called by a Worker Thread to inform a single task has finished, this will send a cbEVT_THREADTASK_ENDED event
      *
      * @param thread The Worker Thread
      */
@@ -254,9 +259,9 @@ inline void cbThreadPool::BatchBegin()

inline void cbThreadPool::Broadcast()
{
-  if (m_concurrentThreads == -1)
+  if (m_concurrentThreads == -1) //// TODO (ollydbg#1#): why need such check?
    return;
-
+  // let the idle(pending) worker thread to execute tasks, those worker threads are waiting for semaphore
  for (std::size_t i = 0; i < static_cast<std::size_t>(m_concurrentThreads - m_workingThreads); ++i)
    m_semaphore->Post();
}
@@ -265,7 +270,10 @@ inline void cbThreadPool::AwakeNeeded()
{
  if (m_concurrentThreads == -1)
    return;
-
+  // TODO (ollydbg#1#): why?
+  // I think this can be optimized, for example, the m_concurrentThreads = 3, and the m_tasksQueue.size() == 5
+  // m_workingThreads = 2, do we need to call Post() function 5 times? I think 1 is enough.
+  // if the m_workingThreads = 0, then I think 3 is enough.
  for (std::size_t i = 0; i < m_tasksQueue.size(); ++i)
    m_semaphore->Post();
}
diff --git a/src/sdk/cbthreadpool.cpp b/src/sdk/cbthreadpool.cpp
index 2a4fa1e..b49a991 100644
--- a/src/sdk/cbthreadpool.cpp
+++ b/src/sdk/cbthreadpool.cpp
@@ -47,16 +47,16 @@ void cbThreadPool::SetConcurrentThreads(int concurrentThreads)
  wxMutexLocker lock(m_Mutex);
  _SetConcurrentThreads(concurrentThreads);
}
-
+// this function is already wrappered by a mutex
void cbThreadPool::_SetConcurrentThreads(int concurrentThreads)
{
-  if (!m_workingThreads)
+  if (!m_workingThreads)// if pool is not running (no thread is running)
  {
    std::for_each(m_threads.begin(), m_threads.end(), std::mem_fun(&cbWorkerThread::Abort));
    Broadcast();
    m_threads.clear();

-    // set a new Semaphore for the new threads
+    // set a new Semaphore for the new threads, note the max value is the concurrentThreads
    m_semaphore = CountedPtr<wxSemaphore>(new wxSemaphore(0, concurrentThreads));

    m_concurrentThreads = concurrentThreads;
@@ -66,7 +66,7 @@ void cbThreadPool::_SetConcurrentThreads(int concurrentThreads)
    {
      m_threads.push_back(new cbWorkerThread(this, m_semaphore));
      m_threads.back()->Create(m_stackSize);
-      m_threads.back()->Run();
+      m_threads.back()->Run(); // this will run cbWorkerThread::Entry()
    }

//    Manager::Get()->GetLogManager()->DebugLog(_T("Concurrent threads for pool set to %d"), m_concurrentThreads);
@@ -85,6 +85,8 @@ void cbThreadPool::AddTask(cbThreadedTask *task, bool autodelete)
  m_tasksQueue.push_back(cbThreadedTaskElement(task, autodelete));
  m_taskAdded = true;

+  // we are in batch mode, so no need to awake the idle thread
+  // m_workingThreads < m_concurrentThreads means there are some threads in idle mode (no task assigned)
  if (!m_batching && m_workingThreads < m_concurrentThreads)
    AwakeNeeded();
}
@@ -118,13 +120,15 @@ cbThreadPool::cbThreadedTaskElement cbThreadPool::GetNextTask()

  return element;
}
-
+// a thread is leaving from idle mode, and run a task
void cbThreadPool::WorkingThread()
{
  wxMutexLocker lock(m_Mutex);
  ++m_workingThreads;
}
-
+// this thread is finishing the task, and is going to be idle.
+// if there is no task left, and the total threads is running is 0, then we have all task done
+// otherwise, just put me to the idle mode by m_semaphore->Post()
bool cbThreadPool::WaitingThread()
{
  wxMutexLocker lock(m_Mutex);
@@ -133,6 +137,9 @@ bool cbThreadPool::WaitingThread()
  if (m_workingThreads <= 0 && m_tasksQueue.empty())
  {
    // Sends cbEVT_THREADTASK_ALLDONE message only the real task is all done
+    // FIXME (ollydbg#1#): Look at the variable m_taskAdded, it was initialized as false in pool's
+    // constructor, and the only chance it was set true is in AddTask() function, I don't find any
+    // code to reset this value to false, is that true logic?
    if (m_taskAdded)
    {
      // notify the owner that all tasks are done



Title: Re: Please help to review the code in cbThreadPool
Post by: ollydbg on February 05, 2014, 07:12:53 AM
Look at the change of rev 2480, there is a change that:

void cbThreadPool::AddTask(cbThreadedTask *task, bool autodelete)
{
  if (!task)
  {
    return;
  }

  wxMutexLocker lock(m_Mutex);

  m_tasksQueue.push_back(cbThreadedTaskElement(task, autodelete));

  if (!m_batching && m_workingThreads < m_concurrentThreads)
  {
    for (std::size_t i = 0; i < static_cast<std::size_t>(m_concurrentThreads - m_workingThreads) && i < m_tasksQueue.size(); ++i)
    {
      m_condMutex->Signal();
    }
  }
}


to


void cbThreadPool::AddTask(cbThreadedTask *task, bool autodelete)
{
  if (!task)
  {
    return;
  }

  wxMutexLocker lock(m_Mutex);

  m_tasksQueue.push_back(cbThreadedTaskElement(task, autodelete));

  if (!m_batching && m_workingThreads < m_concurrentThreads)
  {
    AwakeNeeded();
  }
}


I personally think that this condition is correct:

    for (std::size_t i = 0; i < static_cast<std::size_t>(m_concurrentThreads - m_workingThreads) && i < m_tasksQueue.size(); ++i)
    {
      m_condMutex->Signal();
    }


Look, it both consider two condition:
First condition is:  i < the idle threads numbers (m_concurrentThreads - m_workingThreads)
Second condition is:  i < m_tasksQueue.size()

But why that commit changed those two conditions to one?
Title: Re: Please help to review the code in cbThreadPool
Post by: ollydbg on June 01, 2014, 11:08:42 AM
All the issues were fixed in trunk now.
Title: Re: Please help to review the code in cbThreadPool
Post by: oBFusCATed on June 01, 2014, 11:47:19 AM
BTW, it is more readable to do

awakeThreadNumber = std::min<size_t>(m_tasksQueue.size(), m_concurrentThreads - m_workingThreads);
awakeThreadNumber = std::min<TaskQueueType::value_type>(m_tasksQueue.size(), m_concurrentThreads - m_workingThreads);


instead of:

awakeThreadNumber = std::min(m_tasksQueue.size(), static_cast<std::size_t>(m_concurrentThreads - m_workingThreads));
Title: Re: Please help to review the code in cbThreadPool
Post by: ollydbg on June 01, 2014, 12:47:10 PM
Quote from: oBFusCATed on June 01, 2014, 11:47:19 AM
BTW, it is more readable to do

awakeThreadNumber = std::min<size_t>(m_tasksQueue.size(), m_concurrentThreads - m_workingThreads);
awakeThreadNumber = std::min<TaskQueueType::value_type>(m_tasksQueue.size(), m_concurrentThreads - m_workingThreads);


instead of:

awakeThreadNumber = std::min(m_tasksQueue.size(), static_cast<std::size_t>(m_concurrentThreads - m_workingThreads));

Thanks, done in rev 9791.