News:

The new Release 25.03 is out! You can download binaries for Windows and many major Linux distros here .

Main Menu

Please help to review the code in cbThreadPool

Started by ollydbg, February 05, 2014, 07:02:57 AM

Previous topic - Next topic

ollydbg

By reviewing the code, I found two issue:
1. m_taskAdded don't go to false once it was set to true.

// this thread is finishing the task, and is going to be idle.
// if there is no task left, and the total threads is running is 0, then we have all task done
// otherwise, just put me to the idle mode by m_semaphore->Post()
bool cbThreadPool::WaitingThread()
{
 wxMutexLocker lock(m_Mutex);
 --m_workingThreads;

 if (m_workingThreads <= 0 && m_tasksQueue.empty())
 {
   // Sends cbEVT_THREADTASK_ALLDONE message only the real task is all done
   // FIXME (ollydbg#1#): Look at the variable m_taskAdded, it was initialized as false in pool's
   // constructor, and the only chance it was set true is in AddTask() function, I don't find any
   // code to reset this value to false, is that true logic?
   if (m_taskAdded)
   {
     // notify the owner that all tasks are done
     CodeBlocksEvent evt = CodeBlocksEvent(cbEVT_THREADTASK_ALLDONE, m_ID);
     wxPostEvent(m_pOwner, evt);
   }

See the FIXME section of mine.


2, cbThreadPool::AwakeNeeded run a lot of Post() call, but I don't think it is necessary, right?

inline void cbThreadPool::AwakeNeeded()
{
 if (m_concurrentThreads == -1)
   return;
 // TODO (ollydbg#1#): why?
 // I think this can be optimized, for example, the m_concurrentThreads = 3, and the m_tasksQueue.size() == 5
 // m_workingThreads = 2, do we need to call Post() function 5 times? I think 1 is enough.
 // if the m_workingThreads = 0, then I think 3 is enough.
 for (std::size_t i = 0; i < m_tasksQueue.size(); ++i)
   m_semaphore->Post();
}

See the TODO section above. The code was changed in

Revision: cc3a66de85dd9d73b664e298f35523b69f17ce81
Author: ceniza <ceniza@2a5c6006-c6dd-42ca-98ab-0921f2732cef>
Date: 2006-5-21 10:41:21
Message:
* Replaced the condition with a semaphore (just like the previous implementation).

git-svn-id: http://svn.code.sf.net/p/codeblocks/code/trunk@2480 2a5c6006-c6dd-42ca-98ab-0921f2732cef
----
Modified: src/sdk/cbthreadpool.cpp
Modified: src/sdk/cbthreadpool.h




I add many comments to those classes to help understand the logic, see the patch file below.
I would like to see some other devs or users comments on those issues, thanks.


bbfb0cce6e43a2d649c784b8b82158f38d19ded2
src/include/cbthreadpool.h | 48 +++++++++++++++++++++++++++-------------------
src/sdk/cbthreadpool.cpp   | 19 ++++++++++++------
2 files changed, 41 insertions(+), 26 deletions(-)

diff --git a/src/include/cbthreadpool.h b/src/include/cbthreadpool.h
index a06fb52..4a5ae17 100644
--- a/src/include/cbthreadpool.h
+++ b/src/include/cbthreadpool.h
@@ -70,7 +70,7 @@ class DLLIMPORT cbThreadPool
    /** Begin a batch process
      *
      * @note EVIL: Call it if you want to add all tasks first and get none executed yet.
-      * If you DON'T call it, taks will be executed as you add them (in fact it's what
+      * If you DON'T call it, tasks will be executed as you add them (in fact it's what
      * one would expect).
      */
    void BatchBegin();
@@ -101,7 +101,7 @@ class DLLIMPORT cbThreadPool
        T *operator -> () const throw();

      private:
-        void dispose();
+        void dispose(); //decrease the counter, and if it get 0, destroy both counter and ptr
    };

    /** A Worker Thread class.
@@ -115,7 +115,8 @@ class DLLIMPORT cbThreadPool
        /** cbWorkerThread ctor
          *
          * @param pool Thread Pool this Worker Thread belongs to
-          * @param semaphore Used to synchronise the Worker Threads
+          * @param semaphore Used to synchronize the Worker Threads, it is a reference to the CountedPtr
+          * object
          */
        cbWorkerThread(cbThreadPool *pool, CountedPtr<wxSemaphore> &semaphore);

@@ -137,9 +138,10 @@ class DLLIMPORT cbThreadPool
      private:
        bool m_abort;
        cbThreadPool *m_pPool;
+        // a counted semaphore shared with all the cbWorkerThread
        CountedPtr<wxSemaphore> m_semaphore;
        cbThreadedTask *m_pTask;
-        wxMutex m_taskMutex;
+        wxMutex m_taskMutex;// to protect the member variable accessing from multiply threads
    };

    typedef std::vector<cbWorkerThread *> WorkerThreadsArray;
@@ -170,26 +172,27 @@ class DLLIMPORT cbThreadPool

    typedef std::list<cbThreadedTaskElement> TasksQueue;

-    wxEvtHandler *m_pOwner;
-    int m_ID;
-    bool m_batching;
+    wxEvtHandler *m_pOwner; // events notification will send to this guy
+    int m_ID;           // id used to fill the ID field of the event
+    bool m_batching;    // whether in batch mode of adding tasks

-    int m_concurrentThreads; // current number of concurrent threads
+    int m_concurrentThreads; // current number of concurrent threads, this is the maximum value of the m_workingThreads
    unsigned int m_stackSize; // stack size for every threads
    int m_concurrentThreadsSchedule; // if we cannot apply the new value of concurrent threads, keep it here
-    WorkerThreadsArray m_threads; // the working threads are stored here
-    TasksQueue m_tasksQueue; // and the pending tasks here
+    WorkerThreadsArray m_threads; // the working threads(cbWorkerThread) are stored here
+    TasksQueue m_tasksQueue; // and the pending tasks (cbThreadedTaskElement) here
    bool m_taskAdded; // true if any task added

-    int m_workingThreads; // how many working threads are running a task
+    int m_workingThreads; // how many working threads are running tasks

-    mutable wxMutex m_Mutex; // we better be safe
+    mutable wxMutex m_Mutex; // we better be safe, protect the change of member variables

-    CountedPtr<wxSemaphore> m_semaphore; // used to synchronise the Worker Threads
+    CountedPtr<wxSemaphore> m_semaphore; // used to synchronize the Worker Threads, the counted value is that
+    // how many threads are sharing this semaphore

    void _SetConcurrentThreads(int concurrentThreads); // like SetConcurrentThreads, but non-thread safe
    void Broadcast(); // awakes all threads
-    void AwakeNeeded(); // awakes only a few threads
+    void AwakeNeeded(); // awakes only a few threads // TODO (ollydbg#1#): ?

  protected:
    friend class cbWorkerThread;
@@ -200,16 +203,18 @@ class DLLIMPORT cbThreadPool
      */
    cbThreadedTaskElement GetNextTask();

-    /// Mechanism for the threads to tell the Pool they're running
+    /// Mechanism for the threads to tell the Pool they're running, a thread is switch from the idle
+    /// mode to working mode. This is triggered by semaphore released somewhere
    void WorkingThread();

-    /** Mechanism for the threads to tell the Pool they're done and will wait
+    /** Mechanism for the threads to tell the Pool they're done and will go to idle, so we can assign
+      * another task to this thread.
      *
      * @return true if everything is OK, false if we should abort
      */
    bool WaitingThread();

-    /** Called by a Worker Thread to inform a task has finished
+    /** Called by a Worker Thread to inform a single task has finished, this will send a cbEVT_THREADTASK_ENDED event
      *
      * @param thread The Worker Thread
      */
@@ -254,9 +259,9 @@ inline void cbThreadPool::BatchBegin()

inline void cbThreadPool::Broadcast()
{
-  if (m_concurrentThreads == -1)
+  if (m_concurrentThreads == -1) //// TODO (ollydbg#1#): why need such check?
    return;
-
+  // let the idle(pending) worker thread to execute tasks, those worker threads are waiting for semaphore
  for (std::size_t i = 0; i < static_cast<std::size_t>(m_concurrentThreads - m_workingThreads); ++i)
    m_semaphore->Post();
}
@@ -265,7 +270,10 @@ inline void cbThreadPool::AwakeNeeded()
{
  if (m_concurrentThreads == -1)
    return;
-
+  // TODO (ollydbg#1#): why?
+  // I think this can be optimized, for example, the m_concurrentThreads = 3, and the m_tasksQueue.size() == 5
+  // m_workingThreads = 2, do we need to call Post() function 5 times? I think 1 is enough.
+  // if the m_workingThreads = 0, then I think 3 is enough.
  for (std::size_t i = 0; i < m_tasksQueue.size(); ++i)
    m_semaphore->Post();
}
diff --git a/src/sdk/cbthreadpool.cpp b/src/sdk/cbthreadpool.cpp
index 2a4fa1e..b49a991 100644
--- a/src/sdk/cbthreadpool.cpp
+++ b/src/sdk/cbthreadpool.cpp
@@ -47,16 +47,16 @@ void cbThreadPool::SetConcurrentThreads(int concurrentThreads)
  wxMutexLocker lock(m_Mutex);
  _SetConcurrentThreads(concurrentThreads);
}
-
+// this function is already wrappered by a mutex
void cbThreadPool::_SetConcurrentThreads(int concurrentThreads)
{
-  if (!m_workingThreads)
+  if (!m_workingThreads)// if pool is not running (no thread is running)
  {
    std::for_each(m_threads.begin(), m_threads.end(), std::mem_fun(&cbWorkerThread::Abort));
    Broadcast();
    m_threads.clear();

-    // set a new Semaphore for the new threads
+    // set a new Semaphore for the new threads, note the max value is the concurrentThreads
    m_semaphore = CountedPtr<wxSemaphore>(new wxSemaphore(0, concurrentThreads));

    m_concurrentThreads = concurrentThreads;
@@ -66,7 +66,7 @@ void cbThreadPool::_SetConcurrentThreads(int concurrentThreads)
    {
      m_threads.push_back(new cbWorkerThread(this, m_semaphore));
      m_threads.back()->Create(m_stackSize);
-      m_threads.back()->Run();
+      m_threads.back()->Run(); // this will run cbWorkerThread::Entry()
    }

//    Manager::Get()->GetLogManager()->DebugLog(_T("Concurrent threads for pool set to %d"), m_concurrentThreads);
@@ -85,6 +85,8 @@ void cbThreadPool::AddTask(cbThreadedTask *task, bool autodelete)
  m_tasksQueue.push_back(cbThreadedTaskElement(task, autodelete));
  m_taskAdded = true;

+  // we are in batch mode, so no need to awake the idle thread
+  // m_workingThreads < m_concurrentThreads means there are some threads in idle mode (no task assigned)
  if (!m_batching && m_workingThreads < m_concurrentThreads)
    AwakeNeeded();
}
@@ -118,13 +120,15 @@ cbThreadPool::cbThreadedTaskElement cbThreadPool::GetNextTask()

  return element;
}
-
+// a thread is leaving from idle mode, and run a task
void cbThreadPool::WorkingThread()
{
  wxMutexLocker lock(m_Mutex);
  ++m_workingThreads;
}
-
+// this thread is finishing the task, and is going to be idle.
+// if there is no task left, and the total threads is running is 0, then we have all task done
+// otherwise, just put me to the idle mode by m_semaphore->Post()
bool cbThreadPool::WaitingThread()
{
  wxMutexLocker lock(m_Mutex);
@@ -133,6 +137,9 @@ bool cbThreadPool::WaitingThread()
  if (m_workingThreads <= 0 && m_tasksQueue.empty())
  {
    // Sends cbEVT_THREADTASK_ALLDONE message only the real task is all done
+    // FIXME (ollydbg#1#): Look at the variable m_taskAdded, it was initialized as false in pool's
+    // constructor, and the only chance it was set true is in AddTask() function, I don't find any
+    // code to reset this value to false, is that true logic?
    if (m_taskAdded)
    {
      // notify the owner that all tasks are done



If some piece of memory should be reused, turn them to variables (or const variables).
If some piece of operations should be reused, turn them to functions.
If they happened together, then turn them to classes.

ollydbg

Look at the change of rev 2480, there is a change that:

void cbThreadPool::AddTask(cbThreadedTask *task, bool autodelete)
{
  if (!task)
  {
    return;
  }

  wxMutexLocker lock(m_Mutex);

  m_tasksQueue.push_back(cbThreadedTaskElement(task, autodelete));

  if (!m_batching && m_workingThreads < m_concurrentThreads)
  {
    for (std::size_t i = 0; i < static_cast<std::size_t>(m_concurrentThreads - m_workingThreads) && i < m_tasksQueue.size(); ++i)
    {
      m_condMutex->Signal();
    }
  }
}


to


void cbThreadPool::AddTask(cbThreadedTask *task, bool autodelete)
{
  if (!task)
  {
    return;
  }

  wxMutexLocker lock(m_Mutex);

  m_tasksQueue.push_back(cbThreadedTaskElement(task, autodelete));

  if (!m_batching && m_workingThreads < m_concurrentThreads)
  {
    AwakeNeeded();
  }
}


I personally think that this condition is correct:

    for (std::size_t i = 0; i < static_cast<std::size_t>(m_concurrentThreads - m_workingThreads) && i < m_tasksQueue.size(); ++i)
    {
      m_condMutex->Signal();
    }


Look, it both consider two condition:
First condition is:  i < the idle threads numbers (m_concurrentThreads - m_workingThreads)
Second condition is:  i < m_tasksQueue.size()

But why that commit changed those two conditions to one?
If some piece of memory should be reused, turn them to variables (or const variables).
If some piece of operations should be reused, turn them to functions.
If they happened together, then turn them to classes.

ollydbg

If some piece of memory should be reused, turn them to variables (or const variables).
If some piece of operations should be reused, turn them to functions.
If they happened together, then turn them to classes.

oBFusCATed

BTW, it is more readable to do

awakeThreadNumber = std::min<size_t>(m_tasksQueue.size(), m_concurrentThreads - m_workingThreads);
awakeThreadNumber = std::min<TaskQueueType::value_type>(m_tasksQueue.size(), m_concurrentThreads - m_workingThreads);


instead of:

awakeThreadNumber = std::min(m_tasksQueue.size(), static_cast<std::size_t>(m_concurrentThreads - m_workingThreads));
(most of the time I ignore long posts)
[strangers don't send me private messages, I'll ignore them; post a topic in the forum, but first read the rules!]

ollydbg

Quote from: oBFusCATed on June 01, 2014, 11:47:19 AM
BTW, it is more readable to do

awakeThreadNumber = std::min<size_t>(m_tasksQueue.size(), m_concurrentThreads - m_workingThreads);
awakeThreadNumber = std::min<TaskQueueType::value_type>(m_tasksQueue.size(), m_concurrentThreads - m_workingThreads);


instead of:

awakeThreadNumber = std::min(m_tasksQueue.size(), static_cast<std::size_t>(m_concurrentThreads - m_workingThreads));

Thanks, done in rev 9791.
If some piece of memory should be reused, turn them to variables (or const variables).
If some piece of operations should be reused, turn them to functions.
If they happened together, then turn them to classes.