Stf CS149 Parallel Programming - Assign2

Thread pool for task scheduling

Step1 : parallel thread spawn

First implementaion that spawns new thread for each task:

void TaskSystemParallelSpawn::run(IRunnable* runnable, int num_total_tasks) {


    //
    // TODO: CS149 students will modify the implementation of this
    // method in Part A.  The implementation provided below runs all
    // tasks sequentially on the calling thread.
    //

    // for (int i = 0; i < num_total_tasks; i++) {
    //     runnable->runTask(i, num_total_tasks);
    // }

  std::thread workers[num_threads_];
  for(int i=0; i< num_total_tasks; i+= num_threads_) {
    int cur_end = (i+num_threads_ < num_total_tasks) ? i+num_threads_ : num_total_tasks;
    int cur_begin = i;
    int t_idx = 0;
    for(int j=cur_begin; j < cur_end; j++, t_idx++) {
       workers[t_idx]= std::thread(runTask, runnable, j, num_total_tasks);
    }
    t_idx =0;
    for(int j=cur_begin; j < cur_end; j++, t_idx++) {
      workers[t_idx].join();
    }
  }
}

Result: Not good. Very slow. I think this is because of frequent thread spawn overhead.

(base) ➜  part_a git:(master) ✗ python3 ../tests/run_test_harness.py -n 8 -t  super_light super_super_light
runtasks_ref
Linux x86_64
================================================================================
Running task system grading harness... (2 total tests)
  - Detected CPU with 40 execution contexts
  - Task system configured to use at most 8 threads
================================================================================
================================================================================
Executing test: super_super_light...
Reference binary: ./runtasks_ref_linux
Results for: super_super_light
                                        STUDENT   REFERENCE   PERF?
[Serial]                                12.239    12.216      1.00  (OK)
[Parallel + Always Spawn]               444.19    53.469      8.31  (NOT OK)
[Parallel + Thread Pool + Spin]         12.121    25.723      0.47  (OK)
[Parallel + Thread Pool + Sleep]        12.106    24.976      0.48  (OK)
================================================================================

Solution 2 : Created num_threads at the beginning of run call and reuse all of them. Use atomic counter to asggub unique id to each runnalbe call.

Code :

Notice that that we have to set my_counter_=0 at the beginning of each run call.

void TaskSystemParallelSpawn::run(IRunnable* runnable, int num_total_tasks) { 
  my_counter_ = 0;
  std::vector<std::thread> workers;
  for(int i=0; i< num_threads_; i++) {
    workers.emplace_back([this, runnable, num_total_tasks] {
      this->threadTask(runnable, num_total_tasks);
    });

  }
  for(int i=0; i < num_threads_; i++) {
    workers[i].join();
  }
}
===================================================================================
(base) ➜  part_a git:(master) ✗ python3 ../tests/run_test_harness.py -n 8 -t  super_light super_super_light
runtasks_ref
Linux x86_64
================================================================================
Running task system grading harness... (2 total tests)
  - Detected CPU with 40 execution contexts
  - Task system configured to use at most 8 threads
================================================================================
================================================================================
Executing test: super_super_light...
Reference binary: ./runtasks_ref_linux
Results for: super_super_light
                                        STUDENT   REFERENCE   PERF?
[Serial]                                8.562     12.451      0.69  (OK)
[Parallel + Always Spawn]               56.804    54.287      1.05  (OK)
[Parallel + Thread Pool + Spin]         8.532     25.417      0.34  (OK)
[Parallel + Thread Pool + Sleep]        8.456     24.996      0.34  (OK)
================================================================================
Executing test: super_light...
Reference binary: ./runtasks_ref_linux
Results for: super_light
                                        STUDENT   REFERENCE   PERF?
[Serial]                                81.044    95.194      0.85  (OK)
[Parallel + Always Spawn]               55.138    83.604      0.66  (OK)
[Parallel + Thread Pool + Spin]         80.753    27.388      2.95  (NOT OK)
[Parallel + Thread Pool + Sleep]        80.646    33.46       2.41  (NOT OK)
================================================================================
Overall performance results
[Serial]                                : All passed Perf
[Parallel + Always Spawn]               : All passed Perf
[Parallel + Thread Pool + Spin]         : Perf did not pass all tests
[Parallel + Thread Pool + Sleep]        : Perf did not pass all tests

Step2 : threadpool + spinning waiting

void TaskSystemParallelThreadPoolSpinning::threadTask() {
      // bool done = done_.load();
  while(!done_.load()) {
    // printf("id: %d\n", id);
    mutex_.lock();
    threadArg arg;
    if(!task_queue_.empty()) {
      arg = task_queue_.front();
      task_queue_.pop_front();
    }
    mutex_.unlock();
    if(arg.runnable) {
      arg.runnable->runTask( arg.task_id, arg.num_total_tasks);
      finished_tasks_.fetch_add(1);
    }
  }

}
TaskSystemParallelThreadPoolSpinning::TaskSystemParallelThreadPoolSpinning(int num_threads): ITaskSystem(num_threads) {
    //
    // TODO: CS149 student implementations may decide to perform setup
    // operations (such as thread pool construction) here.
    // Implementations are free to add new class member variables
    // (requiring changes to tasksys.h).
    //
  done_ = false;
  num_threads_ = num_threads;
  for(int i=0; i < num_threads_; i++) {
    workers_.emplace_back([this ]{
      this->threadTask();
    });
  }

}

TaskSystemParallelThreadPoolSpinning::~TaskSystemParallelThreadPoolSpinning() {
  done_.store(true);
  for(int i=0; i< num_threads_; i++) {
    workers_[i].join();
  }
}

void TaskSystemParallelThreadPoolSpinning::run(IRunnable* runnable, int num_total_tasks) {


    //
    // TODO: CS149 students will modify the implementation of this
    // method in Part A.  The implementation provided below runs all
    // tasks sequentially on the calling thread.
    //

  finished_tasks_ = 0;
  {
  const std::lock_guard<std::mutex> lock(mutex_);
  for(int i=0; i < num_total_tasks; i++) {
      threadArg arg(i, num_total_tasks, runnable);
    task_queue_.emplace_back(arg);
  }
  }
  
  while(finished_tasks_.load() != num_total_tasks) {

  }

}
(base) ➜  part_a git:(master) ✗ python3 ../tests/run_test_harness.py -n 8
runtasks_ref
Linux x86_64
================================================================================                              Running task system grading harness... (11 total tests)
- Detected CPU with 40 execution contexts
  - Task system configured to use at most 8 threads
================================================================================
================================================================================                                                                             Executing test: super_super_light...
Reference binary: ./runtasks_ref_linux
Results for: super_super_light
                                        STUDENT   REFERENCE   PERF?
[Serial]                                8.574     12.343      0.69  (OK)
[Parallel + Always Spawn]               56.938    54.949      1.04  (OK)
[Parallel + Thread Pool + Spin]         21.404    25.716      0.83  (OK)
[Parallel + Thread Pool + Sleep]        8.704     17.781      0.49  (OK)
================================================================================
Executing test: super_light...
Reference binary: ./runtasks_ref_linux
Results for: super_light
                                        STUDENT   REFERENCE   PERF?
[Serial]                                80.252    94.107      0.85  (OK)
[Parallel + Always Spawn]               73.338    83.35       0.88  (OK)
[Parallel + Thread Pool + Spin]         27.698    36.119      0.77  (OK)
[Parallel + Thread Pool + Sleep]        80.795    34.28       2.36  (NOT OK)
================================================================================
Executing test: ping_pong_equal...
Reference binary: ./runtasks_ref_linux
Results for: ping_pong_equal
                                        STUDENT   REFERENCE   PERF?
[Serial]                                1297.763  1553.482    0.84  (OK)
[Parallel + Always Spawn]               574.729   649.785     0.88  (OK)
[Parallel + Thread Pool + Spin]         187.65    232.88      0.81  (OK)
[Parallel + Thread Pool + Sleep]        1294.887  226.363     5.72  (NOT OK)
================================================================================
Executing test: ping_pong_unequal...
Reference binary: ./runtasks_ref_linux
Results for: ping_pong_unequal
                                        STUDENT   REFERENCE   PERF?
[Serial]                                2405.604  2401.768    1.00  (OK)
[Parallel + Always Spawn]               969.55    1021.323    0.95  (OK)
[Parallel + Thread Pool + Spin]         336.518   332.326     1.01  (OK)
[Parallel + Thread Pool + Sleep]        2396.002  338.008     7.09  (NOT OK)
================================================================================
Executing test: recursive_fibonacci...
Reference binary: ./runtasks_ref_linux
Results for: recursive_fibonacci
                                        STUDENT   REFERENCE   PERF?
[Serial]                                1578.621  1938.967    0.81  (OK)
[Parallel + Always Spawn]               482.274   548.912     0.88  (OK)
[Parallel + Thread Pool + Spin]         222.252   266.043     0.84  (OK)
[Parallel + Thread Pool + Sleep]        1583.164  266.454     5.94  (NOT OK)
================================================================================
Executing test: math_operations_in_tight_for_loop...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop
                                        STUDENT   REFERENCE   PERF?
[Serial]                                808.052   833.969     0.97  (OK)
[Parallel + Always Spawn]               651.691   705.552     0.92  (OK)
[Parallel + Thread Pool + Spin]         139.645   154.966     0.90  (OK)
[Parallel + Thread Pool + Sleep]        802.654   157.355     5.10  (NOT OK)
================================================================================
================================================================================
Executing test: math_operations_in_tight_for_loop_fewer_tasks...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_fewer_tasks
                                        STUDENT   REFERENCE   PERF?
[Serial]                                806.914   839.226     0.96  (OK)
[Parallel + Always Spawn]               809.429   810.212     1.00  (OK)
[Parallel + Thread Pool + Spin]         215.631   238.903     0.90  (OK)
[Parallel + Thread Pool + Sleep]        805.115   244.607     3.29  (NOT OK)
================================================================================
Executing test: math_operations_in_tight_for_loop_fan_in...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_fan_in
                                        STUDENT   REFERENCE   PERF?
[Serial]                                416.313   428.492     0.97  (OK)
[Parallel + Always Spawn]               133.057   160.116     0.83  (OK)
[Parallel + Thread Pool + Spin]         71.286    74.03       0.96  (OK)
[Parallel + Thread Pool + Sleep]        412.149   81.239      5.07  (NOT OK)
================================================================================
Executing test: math_operations_in_tight_for_loop_reduction_tree...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_reduction_tree
                                        STUDENT   REFERENCE   PERF?
[Serial]                                415.842   428.485     0.97  (OK)
[Parallel + Always Spawn]               142.34    191.563     0.74  (OK)
[Parallel + Thread Pool + Spin]         70.203    65.066      1.08  (OK)
[Parallel + Thread Pool + Sleep]        411.849   67.231      6.13  (NOT OK)
================================================================================
Executing test: spin_between_run_calls...
Reference binary: ./runtasks_ref_linux
Results for: spin_between_run_calls
                                        STUDENT   REFERENCE   PERF?
[Serial]                                540.34    687.538     0.79  (OK)
[Parallel + Always Spawn]               290.111   364.247     0.80  (OK)
[Parallel + Thread Pool + Spin]         295.734   373.543     0.79  (OK)
[Parallel + Thread Pool + Sleep]        538.037   364.643     1.48  (NOT OK)
================================================================================
Executing test: mandelbrot_chunked...
Reference binary: ./runtasks_ref_linux
Results for: mandelbrot_chunked
                                        STUDENT   REFERENCE   PERF?
[Serial]                                526.695   537.581     0.98  (OK)
[Parallel + Always Spawn]               79.414    78.909      1.01  (OK)
[Parallel + Thread Pool + Spin]         83.16     77.74       1.07  (OK)
[Parallel + Thread Pool + Sleep]        530.464   85.143      6.23  (NOT OK)
================================================================================
Overall performance results
[Serial]                                : All passed Perf
[Parallel + Always Spawn]               : All passed Perf
[Parallel + Thread Pool + Spin]         : All passed Perf
[Parallel + Thread Pool + Sleep]        : Perf did not pass all tests

Step3: Threadpool + put threads to sleep when there is nothing to do

Implementation :

Notice that we have to test if each thread is joinable and call join() at the destructor of the class.

Because each thread might have already exited when done_ is set to true

void TaskSystemParallelThreadPoolSleeping::threadTask( ) {
  int num_task = 0;
  while(!done_.load()) {
    std::unique_lock<std::mutex> lock(mutex_);
    threadArg arg ; 
    bool get_task = false;
    if(!task_queue_.empty()) {
      arg = task_queue_.front();
      task_queue_.pop_front();
      get_task = true;
      // printf("get task\n");
    } else {
      cv_.wait(lock);
    }
    lock.unlock();
    if(get_task) {
      arg.runnable->runTask(arg.task_id, arg.num_total_tasks);
      num_task = arg.num_total_tasks;
      finished_tasks_.fetch_add(1);
      if(finished_tasks_.load() == num_task)  {
        // printf("send notify\n");
        cv_.notify_all();
      }
    }
 
  }
}
TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(int num_threads): ITaskSystem(num_threads) {
    //
    // TODO: CS149 student implementations may decide to perform setup
    // operations (such as thread pool construction) here.
    // Implementations are free to add new class member variables
    // (requiring changes to tasksys.h).
    //
  num_threads_ = num_threads;
  done_ = false;
  for(int i=0; i< num_threads_; i++){
    workers_.emplace_back([this]{
      this->threadTask();
    });
  }

}

TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
    //
    // TODO: CS149 student implementations may decide to perform cleanup
    // operations (such as thread pool shutdown construction) here.
    // Implementations are free to add new class member variables
    // (requiring changes to tasksys.h).
    //
  done_.store(true);
  cv_.notify_all();
  for(int i=0; i < num_threads_; i++) {
    if(workers_[i].joinable()) {
      workers_[i].join();
    }
  }
}

void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {


    //
    // TODO: CS149 students will modify the implementation of this
    // method in Parts A and B.  The implementation provided below runs all
    // tasks sequentially on the calling thread.
    //

  std::unique_lock<std::mutex> lock(mutex_);
  finished_tasks_.store(0) ;
  for(int i=0; i < num_total_tasks; i++) {
    threadArg arg(i, num_total_tasks, runnable);
    task_queue_.emplace_back(arg);
  }
  // printf("before notify all\n");
  lock.unlock();
  cv_.notify_all();
  lock.lock();

  
  while(finished_tasks_.load() != num_total_tasks) {
    cv_.wait(lock);
  }

  // printf("finished_tasks_:%d\n", finished_tasks_.load());

}

Part B: threadpoll + async call w

This line of code is important.

We have two conditions to check for cv_.wait(). cv_.wait() will check the condition each time it wakes up and it would stop waiting when the condition is true.

This can help save time for further unncessary condition checking in the while loop.

      cv_.wait(lock, [this]{return !ready_queue_.empty() || done_ ;});

Another change I make compared to previous version of code is that I use bool intead of std::atomic<bool> to store done_ variable which indicates whether the class is destructed.

done_ variable is set to true without lock protection in destructor if its type is std::atomic<bool> which means that thread might miss the notify_all() call from destructor and those worker threads would never be woke up and exit .

After change done_ to type bool, we are guaranteed that all threads would receive the notify_all() call because the destructor holds the lock first and then set done_ to true and then notify_all();

The guarantees the order of instructions.

Issue: Test can not pass with 4 threads.

Investigation is ongoing.

TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
    //
    // TODO: CS149 student implementations may decide to perform cleanup
    // operations (such as thread pool shutdown construction) here.
    // Implementations are free to add new class member variables
    // (requiring changes to tasksys.h).
    //
  std::unique_lock<std::mutex> lock(mutex_);
  done_=true;
  lock.unlock();
  cv_.notify_all();
  printf("destructor\n");
  for(int i=0; i < workers_.size(); i++) {
    // cv_.notify_all();
    if(workers_[i].joinable()) {
      workers_[i].join();
    }
  }
  printf("exit\n");
}

Worker code:


TaskID TaskSystemParallelThreadPoolSleeping::runAsyncWithDeps(IRunnable* runnable, int num_total_tasks,
                                                    const std::vector<TaskID>& deps) {

    //
    // TODO: CS149 students will implement this method in Part B.
    //

  TaskID new_task = task_dep_.GetNewTaskID();
  mutex_.lock();
   taskid_remain_work_count_map_[new_task] = num_total_tasks;
   for(TaskID parent_task_id: deps) {
     dependent_children_[parent_task_id].insert(new_task);
   }
   TaskArg task_arg(new_task, runnable, num_total_tasks,  deps.size());
  printf("task: %d dep count: %ld\n", new_task, deps.size());
   task_id_remaining_dependent_parents_count_[new_task] = task_arg;
  CheckAndNotify(new_task);
  mutex_.unlock();


  return new_task;
}

void TaskSystemParallelThreadPoolSleeping::threadTask( ) {
  std::unique_lock<std::mutex> lock(mutex_);
  while(!done_) {
    threadArg arg ; 
    bool get_task = false;
    if(!ready_queue_.empty()) {
      arg = ready_queue_.front();
      ready_queue_.pop_front();
      get_task = true;
      printf("get task\n");
    } else {
      printf("wait task\n");
      cv_.wait(lock);
      // cv_.wait(lock, [this]{return !ready_queue_.empty() || done_ ;});
      continue;
    }
    lock.unlock();
    if(get_task) {
      int task_thread_id = arg.task_thread_id;
      int task_id = arg.task_id;
      arg.runnable->runTask(task_thread_id, arg.num_total_tasks);
      bool cur_task_finished = false;
      {
        std::unique_lock<std::mutex> inner_lock(mutex_);
        int task_remain_work_count = --taskid_remain_work_count_map_[task_id];
        if(task_remain_work_count == 0) {
          printf("task:%d finished\n", task_id);
          cur_task_finished = true;
          const auto& children_task_ids = dependent_children_[task_id];
          for(TaskID child_task_id: children_task_ids ) {
            task_id_remaining_dependent_parents_count_[child_task_id].remain_dep_parent_task_count--;
            CheckAndMoveTaskToReadyQueue(child_task_id);
          }

        }
      }
      if(cur_task_finished) {
        finished_tasks_.fetch_add(1);
        // extra notify all here for sync()
        cv_.notify_all();

      }
    }
    lock.lock();
  }
}

Add another condition to notify_all() threads.

      if(cur_task_finished || moved_to_ready_queue) {
        finished_tasks_.fetch_add(1);
        // extra notify all here for sync()
        cv_.notify_all();

      }

Task 1 is not moved to ready_queue

destructor
exit
wait task
wait task
task:0 moved to ready_queue
get task
get task
get task
wait task
task:0 finished
wait task

Another issue: the deps parent task might have been finished when the new child task is pushed. Need to deal with this situation.

So we should not use this structure for task schedule

   std::unordered_map<TaskID, std::unordered_set<TaskID>> dependent_children_;

This class structure is not right.

class TaskSystemParallelThreadPoolSleeping: public ITaskSystem {
    public:
        TaskSystemParallelThreadPoolSleeping(int num_threads);
        ~TaskSystemParallelThreadPoolSleeping();
        const char* name();
        void run(IRunnable* runnable, int num_total_tasks);
        TaskID runAsyncWithDeps(IRunnable* runnable, int num_total_tasks,
                                const std::vector<TaskID>& deps);
        void sync();

private:
  void threadTask( );
  bool CheckAndMoveTaskToReadyQueue(TaskID child_task_id);
  void CheckAndNotify(TaskID child_task_id);
  std::vector<std::thread> workers_;
  int num_threads_;
  bool done_;
  std::mutex mutex_;
  std::condition_variable cv_;
  std::atomic<TaskID> task_id_;
  std::unordered_map<TaskID, std::unordered_set<TaskID>> dependent_children_;
  std::unordered_map<TaskID, TaskArg> task_id_remaining_dependent_parents_count_;
  std::deque<threadArg> ready_queue_;
  std::unordered_map<TaskID, int> taskid_remain_work_count_map_;
  std::atomic<int> finished_tasks_;
 

};

Correct implementaion

Code :

void TaskSystemParallelThreadPoolSleeping::threadTask( ) {
  std::unique_lock<std::mutex> lock(mutex_);
  std::vector<TaskID> tasks;
  bool get_new_tasks_to_run = false;
  // get_new_tasks_to_run = task_dep_.CheckAndGetRunnableTasks(tasks);
  // for(TaskID task_id : tasks){
  //   ScheduleTaskToReadyQueue(task_id);
  // }
  // tasks.clear();

  while(!done_ || !ready_queue_.empty() || task_dep_.HasTaskToSchedule()) {
    get_new_tasks_to_run = task_dep_.CheckAndGetRunnableTasks(tasks);
    for(TaskID task_id: tasks) {
      ScheduleTaskToReadyQueue(task_id);
    }
    tasks.clear();
    threadArg arg ; 
    bool get_task = false;
    if(!ready_queue_.empty()) {
      arg = ready_queue_.front();
      ready_queue_.pop_front();
      get_task = true;
      // printf("get task\n");
    } else {
      // printf("wait task\n");
      cv_.wait(lock);
      // cv_.wait(lock, [this]{return !ready_queue_.empty() || done_ ;});
    }
    lock.unlock();
    if(get_task) {
      int task_thread_id = arg.task_thread_id;
      int task_id = arg.task_id;
      arg.runnable->runTask(task_thread_id, arg.num_total_tasks);
      {
        std::unique_lock<std::mutex> inner_lock(mutex_);
        // printf("taskid: %d, thread_id: %d finished\n", task_id, task_thread_id);
        FinishOneSubTask(task_id);
      }
      // Move notify_all out of mutex scope so that 
      // other threads can wake up and do some work
      // if(cur_task_finished || has_moved_to_ready_queue) {
      //   cv_.notify_all();
      // }
    cv_.notify_all();
    }
    lock.lock();
  }
}

// Prerequisite: mutex_ is held
void  TaskSystemParallelThreadPoolSleeping::ScheduleTaskToReadyQueue(TaskID task_id) {
  TaskArg task_arg;
  task_dep_.GetTaskArg(task_id, task_arg);
  for(int i=0; i < task_arg.num_total_tasks; i++) {
    threadArg ta(i, task_arg.num_total_tasks, task_arg.runnable, task_id);
    taskid_remain_work_count_map_[task_id] = task_arg.num_total_tasks;
    ready_queue_.emplace_back(ta);
  }

}
const char* TaskSystemParallelThreadPoolSleeping::name() {
    return "Parallel + Thread Pool + Sleep";
}

TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(int num_threads): ITaskSystem(num_threads) {
    //
    // TODO: CS149 student implementations may decide to perform setup
    // operations (such as thread pool construction) here.
    // Implementations are free to add new class member variables
    // (requiring changes to tasksys.h).
    //
  done_ = false;
  // task_id_ = 0;
  num_threads_ = num_threads;
  // finished_tasks_ = 0;
  for(int i=0; i < num_threads_; i++) {
    workers_.emplace_back([this]{
      this->threadTask();
    });
  }
}

TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
    //
    // TODO: CS149 student implementations may decide to perform cleanup
    // operations (such as thread pool shutdown construction) here.
    // Implementations are free to add new class member variables
    // (requiring changes to tasksys.h).
    //
  std::unique_lock<std::mutex> lock(mutex_);
  done_=true;
  lock.unlock();
  cv_.notify_all();
  // printf("destructor\n");
  for(int i=0; i < workers_.size(); i++) {
    // cv_.notify_all();
    if(workers_[i].joinable()) {
      workers_[i].join();
    }
  }
  // printf("exit\n");
}

void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {


    //
    // TODO: CS149 students will modify the implementation of this
    // method in Parts A and B.  The implementation provided below runs all
    // tasks sequentially on the calling thread.
    //

    for (int i = 0; i < num_total_tasks; i++) {
        runnable->runTask(i, num_total_tasks);
    }
}

// Two places to shcedule tasks and put tasks to ready_queue_ directly 
// or move tasks in task_queue_ to ready_queue_ at the start of runAsyncWithDeps call
// or at threadTask where each TaskID is done
TaskID TaskSystemParallelThreadPoolSleeping::runAsyncWithDeps(IRunnable* runnable, int num_total_tasks,
                                                    const std::vector<TaskID>& deps) {

    //
    // TODO: CS149 students will implement this method in Part B.
    //

  mutex_.lock();
  TaskID new_task = task_dep_.GetNewTaskID();
  task_dep_.AddTaskDep(new_task, deps, runnable, num_total_tasks);
  // printf("task: %d dep count: %ld\n", new_task, deps.size());
  mutex_.unlock();
  cv_.notify_all();


  return new_task;
}

void TaskSystemParallelThreadPoolSleeping::sync() {

    //
    // TODO: CS149 students will modify the implementation of this method in Part B.
    //
  std::unique_lock<std::mutex> lock(mutex_);
  // In case all threads atre put to sleep


  while(!ready_queue_.empty() || task_dep_.GetFinishedTaskCount() != task_dep_.GetTotalTaskCount()  ) {
    // printf("finished task count:%d, task_id:%d\n", finished_tasks_.load(), task_id_.load());
    cv_.wait(lock);
    // printf("finished task count:%d, task_id:%d\n", finished_tasks_.load(), task_id_.load());
  }
    return;
}

TaskDep::TaskDep() {
  task_id_ = 0;
  finished_tasks_ = 0;
}
TaskID TaskDep::GetNewTaskID() {
  return task_id_.fetch_add(1);

}

int TaskDep::GetFinishedTaskCount() const  {
  return finished_tasks_.load();
}

void TaskDep::FinishTask(TaskID task_id) {
  finished_task_ids_.insert(task_id);
  finished_tasks_.fetch_add(1);
}
void TaskSystemParallelThreadPoolSleeping::FinishOneSubTask(TaskID task_id) {
  if(--taskid_remain_work_count_map_[task_id] ==0 ) {
    // printf("task:%d finished\n", task_id);
    task_dep_.FinishTask(task_id);
    }
}

bool TaskDep::CheckAndGetRunnableTasks(std::vector<TaskID>& tasks) {
  std::vector<TaskID> to_erase;
  for(auto &iter: dependent_parents_) {
    TaskID child_task_id = iter.first;
    // printf("child task id:%d\n", child_task_id  );
    for(auto parent_it=iter.second.begin(); parent_it!=iter.second.end();){
      // printf("   parent: %d, finished:%d\n", *parent_it, CheckTaskFinished(*parent_it));
      if( CheckTaskFinished(*parent_it)){
        parent_it = iter.second.erase(parent_it);
      } else {
        parent_it++;  
      }
    }
    bool not_scheduled = scheduled_task_ids_.find(child_task_id) == scheduled_task_ids_.end() ;
    bool parents_empty =dependent_parents_[child_task_id].empty(); 
    if(not_scheduled && parents_empty){
      scheduled_task_ids_.insert(child_task_id);
      // printf("new task schedule:%d\n", child_task_id);
      tasks.emplace_back(child_task_id);
      to_erase.emplace_back(child_task_id);
    }
  }
  for(TaskID erase_id: to_erase) {
    dependent_parents_.erase(erase_id);
  }
  return !tasks.empty();
}

bool TaskDep::CheckTaskFinished(TaskID task_id) {
  return finished_task_ids_.find(task_id) != finished_task_ids_.end();

}

void TaskDep::GetTaskArg(TaskID task_id, TaskArg& task_arg) {
  const auto& ta = task_args_[task_id];
  task_arg.task_id = ta.task_id;
  task_arg.num_total_tasks = ta.num_total_tasks;
  task_arg.runnable = ta.runnable;
  task_arg.remain_dep_parent_task_count = ta.remain_dep_parent_task_count;
}

void TaskDep::AddTaskDep(TaskID child, std::vector<TaskID> deps, IRunnable* runnable, int num_total_tasks) {
  TaskArg ta;
  ta.task_id = child;
  ta.remain_dep_parent_task_count = deps.size();
  ta.runnable = runnable;
  ta.num_total_tasks = num_total_tasks;
  if(!deps.empty()) {
  for(TaskID task_id: deps) {
    dependent_parents_[child].insert(task_id);
  }

  } else {
    dependent_parents_[child] = {};
  }
  task_args_[child] = ta;

}

Notice the difference between code below

The first piece of code would not work because it creates a copy of the elements of dependent_parents_

so dependent_parents_[child_task_id].empty() would always return false

  for(auto iter: dependent_parents_) {

    if(scheduled_task_ids_.find(child_task_id) == scheduled_task_ids_.end() &&dependent_parents_[child_task_id].empty()){
      scheduled_task_ids_.insert(child_task_id);
      printf("new task schedule:%d\n", child_task_id);
      tasks.emplace_back(child_task_id);
      to_erase.emplace_back(child_task_id);
      // dependent_parents_.erase(child_task_id);
    }
  for(auto &iter: dependent_parents_) {

    if(scheduled_task_ids_.find(child_task_id) == scheduled_task_ids_.end() &&dependent_parents_[child_task_id].empty()){
      scheduled_task_ids_.insert(child_task_id);
      printf("new task schedule:%d\n", child_task_id);
      tasks.emplace_back(child_task_id);
      to_erase.emplace_back(child_task_id);
      // dependent_parents_.erase(child_task_id);
    }

Result:

Not good. I think it’s because I wake up threads frequently.

Try to reduce wake up time.

Executing test: super_super_light_async...
Reference binary: ./runtasks_ref_linux
Results for: super_super_light_async
                                        STUDENT   REFERENCE   PERF?
[Serial]                                8.706     12.433      0.70  (OK)
[Parallel + Always Spawn]               8.406     56.047      0.15  (OK)
[Parallel + Thread Pool + Spin]         8.27      25.412      0.33  (OK)
[Parallel + Thread Pool + Sleep]        224.619   24.681      9.10  (NOT OK)

================================================================================

Executing test: super_light_async...
Reference binary: ./runtasks_ref_linux
Results for: super_light_async
                                        STUDENT   REFERENCE   PERF?
[Serial]                                82.485    95.578      0.86  (OK)
[Parallel + Always Spawn]               81.039    81.133      1.00  (OK)
[Parallel + Thread Pool + Spin]         81.268    34.526      2.35  (NOT OK)
[Parallel + Thread Pool + Sleep]        408.665   31.755      12.87  (NOT OK)

================================================================================
Executing test: ping_pong_equal_async...
Reference binary: ./runtasks_ref_linux
Results for: ping_pong_equal_async
                                        STUDENT   REFERENCE   PERF?
[Serial]                                1313.035  1542.283    0.85  (OK)
[Parallel + Always Spawn]               1309.708  701.653     1.87  (NOT OK)
[Parallel + Thread Pool + Spin]         1313.704  227.62      5.77  (NOT OK)
[Parallel + Thread Pool + Sleep]        778.308   224.776     3.46  (NOT OK)
================================================================================

Executing test: ping_pong_unequal_async...
Reference binary: ./runtasks_ref_linux
Results for: ping_pong_unequal_async
                                        STUDENT   REFERENCE   PERF?
[Serial]                                2446.194  2402.307    1.02  (OK)
[Parallel + Always Spawn]               2456.503  976.121     2.52  (NOT OK)
[Parallel + Thread Pool + Spin]         2454.091  332.344     7.38  (NOT OK)
[Parallel + Thread Pool + Sleep]        506.64    330.938     1.53  (NOT OK)
================================================================================

Executing test: recursive_fibonacci_async...
Reference binary: ./runtasks_ref_linux
Results for: recursive_fibonacci_async
                                        STUDENT   REFERENCE   PERF?
[Serial]                                1310.101  1946.319    0.67  (OK)
[Parallel + Always Spawn]               1303.736  505.253     2.58  (NOT OK)
[Parallel + Thread Pool + Spin]         1306.294  261.702     4.99  (NOT OK)
[Parallel + Thread Pool + Sleep]        186.681   264.498     0.71  (OK)
================================================================================

Executing test: math_operations_in_tight_for_loop_async...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_async
                                        STUDENT   REFERENCE   PERF?
[Serial]                                821.415   854.457     0.96  (OK)
[Parallel + Always Spawn]               822.081   697.755     1.18  (OK)
[Parallel + Thread Pool + Spin]         817.92    149.114     5.49  (NOT OK)
[Parallel + Thread Pool + Sleep]        7622.159  148.934     51.18  (NOT OK)
================================================================================
Executing test: math_operations_in_tight_for_loop_reduction_tree_async...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_reduction_tree_async
                                        STUDENT   REFERENCE   PERF?
[Serial]                                419.026   439.384     0.95  (OK)
[Parallel + Always Spawn]               418.167   141.737     2.95  (NOT OK)
[Parallel + Thread Pool + Spin]         418.431   72.123      5.80  (NOT OK)
[Parallel + Thread Pool + Sleep]        70.724    71.191      0.99  (OK)
================================================================================

Reduce run time

Another change I want to make is that I can only schedule tasks at the begining of run call or when a task is finished. This means that there will be less check to get runnable task.

Because there are only places where task is scheduled and they are

  1. The task is first inserted to system
  2. Another task is finished.

Two possible state of parent task.

  1. The parent task is finished. For this, we can erase the task dependency for child task when the child task is submitted.

  2. The parent task is not finished. For this, when parent task is finished, it will talk with task manager and tell it to erase parent task from taske dependency structure.

For both situation, check if child task can be scheduled.

Code:

TaskDep::TaskDep() {
  task_id_ = 0;
  finished_tasks_ = 0;
}
TaskID TaskDep::GetNewTaskID() {
  return task_id_.fetch_add(1);

}

int TaskDep::GetFinishedTaskCount() const  {
  return finished_tasks_.load();
}

void TaskDep::FinishTask(TaskID task_id, std::vector<TaskID>& children_tasks) {
  finished_task_ids_.insert(task_id);
  finished_tasks_.fetch_add(1);
  // remove this parent from all of its children.
  for(TaskID child_task_id: dependent_children_[task_id] ) {
    dependent_parents_[child_task_id].erase(task_id);
    // If child has no parent dependency , we change schedule it
    if(dependent_parents_[child_task_id].empty()) {
      children_tasks.emplace_back(child_task_id);
    }
    // if(dependent_children_[child_task_id].find(task_id) != dependent_children_[child_task_id].end()) {
    //   dependent_children_[child_task_id].erase(task_id);
    // }
    // if(dependent_parents_[child_task_id].empty()) {
    //   children_tasks.emplace_back(child_task_id);
    // }

  }
}
bool TaskSystemParallelThreadPoolSleeping::FinishOneSubTask(TaskID task_id, std::vector<TaskID>& children_tasks) {
  if(--taskid_remain_work_count_map_[task_id] ==0 ) {
    // printf("task:%d finished\n", task_id);
    task_dep_.FinishTask(task_id, children_tasks);
    return true;
    }
  return false; 
}

bool TaskDep::CheckAndGetRunnableTasks(std::vector<TaskID>& tasks) {
  std::vector<TaskID> to_erase;
  for(auto &iter: dependent_parents_) {
    TaskID child_task_id = iter.first;
    // printf("child task id:%d\n", child_task_id  );
    for(auto parent_it=iter.second.begin(); parent_it!=iter.second.end();){
      // printf("   parent: %d, finished:%d\n", *parent_it, CheckTaskFinished(*parent_it));
      if( CheckTaskFinished(*parent_it)){
        parent_it = iter.second.erase(parent_it);
      } else {
        parent_it++;  
      }
    }
    bool not_scheduled = scheduled_task_ids_.find(child_task_id) == scheduled_task_ids_.end() ;
    bool parents_empty =dependent_parents_[child_task_id].empty(); 
    if(not_scheduled && parents_empty){
      scheduled_task_ids_.insert(child_task_id);
      // printf("new task schedule:%d\n", child_task_id);
      tasks.emplace_back(child_task_id);
      to_erase.emplace_back(child_task_id);
    }
  }
  for(TaskID erase_id: to_erase) {
    dependent_parents_.erase(erase_id);
  }
  return !tasks.empty();
}

bool TaskDep::CheckTaskFinished(TaskID task_id) {
  return finished_task_ids_.find(task_id) != finished_task_ids_.end();

}

void TaskDep::GetTaskArg(TaskID task_id, TaskArg& task_arg) {
  const auto& ta = task_args_[task_id];
  task_arg.task_id = ta.task_id;
  task_arg.num_total_tasks = ta.num_total_tasks;
  task_arg.runnable = ta.runnable;
  task_arg.remain_dep_parent_task_count = ta.remain_dep_parent_task_count;
}

void TaskDep::AddTaskDep(TaskID child, std::vector<TaskID> deps, IRunnable* runnable, int num_total_tasks) {
  TaskArg ta;
  ta.task_id = child;
  ta.remain_dep_parent_task_count = deps.size();
  ta.runnable = runnable;
  ta.num_total_tasks = num_total_tasks;
  if(!deps.empty()) {
  for(TaskID task_id: deps) {
    dependent_parents_[child].insert(task_id);
    dependent_children_[task_id].insert(child);
  }

  } else {
    dependent_parents_[child] = {};
  }
  task_args_[child] = ta;

}
void TaskSystemParallelThreadPoolSleeping::threadTask( ) {
  std::unique_lock<std::mutex> lock(mutex_);
  std::vector<TaskID> tasks;
  bool get_new_tasks_to_run = false;
    while(!done_ || !ready_queue_.empty() ) {
    threadArg arg ; 
    bool get_task = false;
    if(!ready_queue_.empty()) {
      arg = ready_queue_.front();
      ready_queue_.pop_front();
      get_task = true;
      // printf("get task\n");
    } else {
      // printf("wait task\n");
      cv_.wait(lock);
      // cv_.wait(lock, [this]{return !ready_queue_.empty() || done_ ;});
    }
    lock.unlock();
    if(get_task) {
      int task_thread_id = arg.task_thread_id;
      int task_id = arg.task_id;
      bool cur_task_finished = false;
      std::vector<TaskID> children_task_to_schedule;
      arg.runnable->runTask(task_thread_id, arg.num_total_tasks);
      {
        std::unique_lock<std::mutex> inner_lock(mutex_);
        // printf("taskid: %d, thread_id: %d finished\n", task_id, task_thread_id);
        cur_task_finished = FinishOneSubTask(task_id, children_task_to_schedule);
        if(!children_task_to_schedule.empty()) {
          for(TaskID child_task_id: children_task_to_schedule){
            ScheduleTaskToReadyQueue(child_task_id);
            // Is sthis ok ?
            // cv_.notify_all();
          }
        }
      }
      // Move notify_all out of mutex scope so that 
      // other threads can wake up and do some work
      // if(!ready_queue_.empty()  ) {
      // }
      if(!children_task_to_schedule.empty() || wait_sync_.load()) {
        cv_.notify_all();
      }
    }
    lock.lock();
  }
}

// bool TaskSystemParallelThreadPoolSleeping::CheckTaskAndMoveToReadyQueue() {
//   std::vector<TaskID> tasks;
//     bool get_new_tasks_to_run = task_dep_.CheckAndGetRunnableTasks(tasks);
//     for(TaskID task_id: tasks) {
//       ScheduleTaskToReadyQueue(task_id);
//     }
//     // tasks.clear();
//   return get_new_tasks_to_run;

// }
// Prerequisite: mutex_ is held
void  TaskSystemParallelThreadPoolSleeping::ScheduleTaskToReadyQueue(TaskID task_id) {
  TaskArg task_arg;
  task_dep_.GetTaskArg(task_id, task_arg);
  bool not_scheduled = scheduled_task_ids_.find(task_id) == scheduled_task_ids_.end() ;
  if(not_scheduled  ){
    scheduled_task_ids_.insert(task_id);
    // printf("new task schedule:%d\n", child_task_id);
    // to_erase.emplace_back(child_task_id);
  }
  for(int i=0; i < task_arg.num_total_tasks; i++) {
    threadArg ta(i, task_arg.num_total_tasks, task_arg.runnable, task_id);
    taskid_remain_work_count_map_[task_id] = task_arg.num_total_tasks;
    ready_queue_.emplace_back(ta);
  }

}
const char* TaskSystemParallelThreadPoolSleeping::name() {
    return "Parallel + Thread Pool + Sleep";
}

TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(int num_threads): ITaskSystem(num_threads) {
    //
    // TODO: CS149 student implementations may decide to perform setup
    // operations (such as thread pool construction) here.
    // Implementations are free to add new class member variables
    // (requiring changes to tasksys.h).
    //
  done_ = false;
  // task_id_ = 0;
  num_threads_ = num_threads;
  // finished_tasks_ = 0;
  for(int i=0; i < num_threads_; i++) {
    workers_.emplace_back([this]{
      this->threadTask();
    });
  }
}

TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
    //
    // TODO: CS149 student implementations may decide to perform cleanup
    // operations (such as thread pool shutdown construction) here.
    // Implementations are free to add new class member variables
    // (requiring changes to tasksys.h).
    //
  std::unique_lock<std::mutex> lock(mutex_);
  done_=true;
  lock.unlock();
  cv_.notify_all();
  // printf("destructor\n");
  for(int i=0; i < workers_.size(); i++) {
    // cv_.notify_all();
    if(workers_[i].joinable()) {
      workers_[i].join();
    }
  }
  // printf("exit\n");
}

void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {


    //
    // TODO: CS149 students will modify the implementation of this
    // method in Parts A and B.  The implementation provided below runs all
    // tasks sequentially on the calling thread.
    //

    for (int i = 0; i < num_total_tasks; i++) {
        runnable->runTask(i, num_total_tasks);
    }
}

// Two places to shcedule tasks and put tasks to ready_queue_ directly 
// or move tasks in task_queue_ to ready_queue_ at the start of runAsyncWithDeps call
// or at threadTask where each TaskID is done
TaskID TaskSystemParallelThreadPoolSleeping::runAsyncWithDeps(IRunnable* runnable, int num_total_tasks,
                                                    const std::vector<TaskID>& deps) {

    //
    // TODO: CS149 students will implement this method in Part B.
    //

  mutex_.lock();
  TaskID new_task = task_dep_.GetNewTaskID();
  printf("task: %d dep count: %ld, dep id:%d\n", new_task, deps.size(), deps.empty() ? 0 : deps[0]);
  // CheckTaskAndMoveToReadyQueue();
  std::vector<TaskID> new_deps;
  for(TaskID parent_task_id: deps) {
    if(!task_dep_.CheckTaskFinished(parent_task_id)) {
      new_deps.emplace_back(parent_task_id);

    }
  }
  task_dep_.AddTaskDep(new_task, new_deps, runnable, num_total_tasks);
  bool scheduled = false;
  if(new_deps.empty()) {
    ScheduleTaskToReadyQueue(new_task);
    scheduled = true;
  } 
  mutex_.unlock();
  if(scheduled) {
    cv_.notify_all();
  }


  return new_task;
}

void TaskSystemParallelThreadPoolSleeping::sync() {

    //
    // TODO: CS149 students will modify the implementation of this method in Part B.
    //
  std::unique_lock<std::mutex> lock(mutex_);
  // In case all threads atre put to sleep

  wait_sync_.store(true);

  while(!ready_queue_.empty() || task_dep_.GetFinishedTaskCount() != task_dep_.GetTotalTaskCount()  ) {
    // printf("finished task count:%d, task_id:%d\n", finished_tasks_.load(), task_id_.load());
    cv_.wait(lock);
    // printf("finished task count:%d, task_id:%d\n", task_dep_.GetFinishedTaskCount(), task_dep_.GetTotalTaskCount());
  }
  wait_sync_.store(false);
    return;
}
class TaskDep{
public:
  TaskDep();
  void AddTaskDep(TaskID child, std::vector<TaskID> deps, IRunnable* runnable, int num_total_tasks);
  // Who is responsible for call this function?
  // I guess we should just iterate all task_id to see 
  // if we can schedule them.
  // void CheckAndRemoveDep(TaskID child_task_id);
  void FinishTask(TaskID task_id, std::vector<TaskID>& children_tasks);
  TaskID GetNewTaskID();
  int GetFinishedTaskCount() const ;
  int GetTotalTaskCount() const { return task_id_.load();}
  bool CheckTaskFinished(TaskID task_id);
  bool CheckAndGetRunnableTasks(std::vector<TaskID>& tasks);
  bool HasTaskToSchedule() const { return !dependent_parents_.empty();}
  void GetTaskArg(TaskID task_id, TaskArg& task_arg);

private:
  std::atomic<TaskID> task_id_;
  std::unordered_map<TaskID, std::unordered_set<TaskID>> dependent_parents_;
  std::unordered_map<TaskID, std::unordered_set<TaskID>> dependent_children_;
  std::unordered_set<TaskID> finished_task_ids_;
  std::unordered_set<TaskID> scheduled_task_ids_;
  std::unordered_map<TaskID, TaskArg> task_args_;
  std::atomic<int> finished_tasks_;

 
};
/*
 * TaskSystemParallelThreadPoolSleeping: This class is the student's
 * optimized implementation of a parallel task execution engine that uses
 * a thread pool. See definition of ITaskSystem in
 * itasksys.h for documentation of the ITaskSystem interface.
 */
class TaskSystemParallelThreadPoolSleeping: public ITaskSystem {
    public:
        TaskSystemParallelThreadPoolSleeping(int num_threads);
        ~TaskSystemParallelThreadPoolSleeping();
        const char* name();
        void run(IRunnable* runnable, int num_total_tasks);
        TaskID runAsyncWithDeps(IRunnable* runnable, int num_total_tasks,
                                const std::vector<TaskID>& deps);
        void sync();

private:
  void threadTask( );

  bool FinishOneSubTask(TaskID task_id, std::vector<TaskID>& children_task) ;
  void ScheduleTaskToReadyQueue(TaskID task_id);
  std::vector<std::thread> workers_;
  int num_threads_;
  bool done_;
  std::atomic<bool> wait_sync_;
  std::mutex mutex_;
  std::condition_variable cv_;
  std::deque<threadArg> ready_queue_;
  std::unordered_map<TaskID, int> taskid_remain_work_count_map_;
  std::unordered_set<TaskID> scheduled_task_ids_;
  TaskDep task_dep_;


};


Test Output:

Run time is reduced a lot compared to previsou frequent wake up call.

But it still does not have similar run time in some case compared to reference implemntation.

Executing test: super_super_light_async...
Reference binary: ./runtasks_ref_linux
Results for: super_super_light_async
                                        STUDENT   REFERENCE   PERF?
[Serial]                                8.756     12.191      0.72  (OK)
[Parallel + Always Spawn]               8.429     55.629      0.15  (OK)
[Parallel + Thread Pool + Spin]         8.556     26.492      0.32  (OK)
[Parallel + Thread Pool + Sleep]        30.866    15.692      1.97  (NOT OK)
================================================================================
Executing test: super_light_async...
Reference binary: ./runtasks_ref_linux
Results for: super_light_async
                                        STUDENT   REFERENCE   PERF?
[Serial]                                81.03     95.452      0.85  (OK)
[Parallel + Always Spawn]               79.448    80.543      0.99  (OK)
[Parallel + Thread Pool + Spin]         80.317    30.917      2.60  (NOT OK)
[Parallel + Thread Pool + Sleep]        37.437    28.054      1.33  (NOT OK)
================================================================================

Executing test: ping_pong_equal_async...
Reference binary: ./runtasks_ref_linux
Results for: ping_pong_equal_async
                                        STUDENT   REFERENCE   PERF?
[Serial]                                1294.45   1544.952    0.84  (OK)
[Parallel + Always Spawn]               1296.49   662.509     1.96  (NOT OK)
[Parallel + Thread Pool + Spin]         1293.489  226.582     5.71  (NOT OK)
[Parallel + Thread Pool + Sleep]        193.945   224.466     0.86  (OK)
================================================================================

Executing test: ping_pong_unequal_async...
Reference binary: ./runtasks_ref_linux
Results for: ping_pong_unequal_async
                                        STUDENT   REFERENCE   PERF?
[Serial]                                2429.01   2399.749    1.01  (OK)
[Parallel + Always Spawn]               2407.208  1001.25     2.40  (NOT OK)
[Parallel + Thread Pool + Spin]         2417.762  334.029     7.24  (NOT OK)
[Parallel + Thread Pool + Sleep]        344.299   328.969     1.05  (OK)

================================================================================
Executing test: recursive_fibonacci_async...
Reference binary: ./runtasks_ref_linux
Results for: recursive_fibonacci_async
                                        STUDENT   REFERENCE   PERF?
[Serial]                                1581.684  1939.945    0.82  (OK)
[Parallel + Always Spawn]               1585.249  556.636     2.85  (NOT OK)
[Parallel + Thread Pool + Spin]         1584.694  258.648     6.13  (NOT OK)
[Parallel + Thread Pool + Sleep]        220.133   254.85      0.86  (OK)

================================================================================
Executing test: math_operations_in_tight_for_loop_async...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_async
                                        STUDENT   REFERENCE   PERF?
[Serial]                                808.41    856.84      0.94  (OK)
[Parallel + Always Spawn]               811.738   693.496     1.17  (OK)
[Parallel + Thread Pool + Spin]         808.998   145.345     5.57  (NOT OK)
[Parallel + Thread Pool + Sleep]        152.518   142.003     1.07  (OK)
================================================================================

Executing test: math_operations_in_tight_for_loop_fewer_tasks_async...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_fewer_tasks_async
                                        STUDENT   REFERENCE   PERF?
[Serial]                                812.784   858.997     0.95  (OK)
[Parallel + Always Spawn]               812.874   811.4       1.00  (OK)
[Parallel + Thread Pool + Spin]         810.69    126.379     6.41  (NOT OK)
[Parallel + Thread Pool + Sleep]        122.005   126.256     0.97  (OK)
================================================================================

Executing test: math_operations_in_tight_for_loop_fan_in_async...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_fan_in_async
                                        STUDENT   REFERENCE   PERF?
[Serial]                                417.681   435.005     0.96  (OK)
[Parallel + Always Spawn]               413.541   244.769     1.69  (NOT OK)
[Parallel + Thread Pool + Spin]         413.021   75.503      5.47  (NOT OK)
[Parallel + Thread Pool + Sleep]        73.391    65.41       1.12  (OK)
================================================================================

Executing test: math_operations_in_tight_for_loop_reduction_tree_async...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_reduction_tree_async
                                        STUDENT   REFERENCE   PERF?
[Serial]                                413.848   438.395     0.94  (OK)
[Parallel + Always Spawn]               414.925   195.032     2.13  (NOT OK)
[Parallel + Thread Pool + Spin]         414.513   72.565      5.71  (NOT OK)
[Parallel + Thread Pool + Sleep]        59.153    65.233      0.91  (OK)
================================================================================

So we can get rid of checking each tasks every time to see if it can be scheduled and put to ready queue.

My code does not has comparable result compared to ref for super_light_async and super_super_light_async .

I think this is because there is still opt space to reduce wake up frequency..

I decide to move on with assign3 of cs149 and just leave the current code as it is.

Just don’t want to spend time on this and want to spend time on learning some cuda writing.

Side knowkedge: buffering in python command

Prompt ** why I don’t see any output from this command but when I remove | tee ./test.out there is output

python3 ../tests/run_test_harness.py -n 8 -a 2>&1 | tee ./test.out **Response
The issue you’re encountering is likely due to buffering. When you use tee to redirect output to a file, the output may be buffered, causing it not to appear immediately on the screen.

Explanation:

  • Without tee: The output is directly sent to the terminal, and you see it immediately.
  • With tee: The output is first written to a buffer, which is then written to the file and the terminal. This buffering can delay the appearance of the output.

Solution:

You can use the -u option with python3 to force the standard streams to be unbuffered. This will ensure that the output appears immediately.

Revised command:

python3 -u ../tests/run_test_harness.py -n 8 -a 2>&1 | tee ./test.out

This should resolve the issue and allow you to see the output in real-time while still writing it to test.out.




Enjoy Reading This Article?

Here are some more articles you might like to read next:

  • Learning-based memory allocation for C++ server workloads summary
  • my question:
  • Binary search algorithm variant
  • Docker Rocksdb build
  • Difference between Dockerfile and Docker Compose