Stf CS149 Parallel Programming - Assign2
Thread pool for task scheduling
Step1 : parallel thread spawn
First implementaion that spawns new thread for each task:
void TaskSystemParallelSpawn::run(IRunnable* runnable, int num_total_tasks) {
//
// TODO: CS149 students will modify the implementation of this
// method in Part A. The implementation provided below runs all
// tasks sequentially on the calling thread.
//
// for (int i = 0; i < num_total_tasks; i++) {
// runnable->runTask(i, num_total_tasks);
// }
std::thread workers[num_threads_];
for(int i=0; i< num_total_tasks; i+= num_threads_) {
int cur_end = (i+num_threads_ < num_total_tasks) ? i+num_threads_ : num_total_tasks;
int cur_begin = i;
int t_idx = 0;
for(int j=cur_begin; j < cur_end; j++, t_idx++) {
workers[t_idx]= std::thread(runTask, runnable, j, num_total_tasks);
}
t_idx =0;
for(int j=cur_begin; j < cur_end; j++, t_idx++) {
workers[t_idx].join();
}
}
}
Result: Not good. Very slow. I think this is because of frequent thread spawn overhead.
(base) ➜ part_a git:(master) ✗ python3 ../tests/run_test_harness.py -n 8 -t super_light super_super_light
runtasks_ref
Linux x86_64
================================================================================
Running task system grading harness... (2 total tests)
- Detected CPU with 40 execution contexts
- Task system configured to use at most 8 threads
================================================================================
================================================================================
Executing test: super_super_light...
Reference binary: ./runtasks_ref_linux
Results for: super_super_light
STUDENT REFERENCE PERF?
[Serial] 12.239 12.216 1.00 (OK)
[Parallel + Always Spawn] 444.19 53.469 8.31 (NOT OK)
[Parallel + Thread Pool + Spin] 12.121 25.723 0.47 (OK)
[Parallel + Thread Pool + Sleep] 12.106 24.976 0.48 (OK)
================================================================================
Solution 2 : Created num_threads
at the beginning of run
call and reuse all of them. Use atomic counter to asggub unique id to each runnalbe call.
Code :
Notice that that we have to set my_counter_=0
at the beginning of each run call.
void TaskSystemParallelSpawn::run(IRunnable* runnable, int num_total_tasks) {
my_counter_ = 0;
std::vector<std::thread> workers;
for(int i=0; i< num_threads_; i++) {
workers.emplace_back([this, runnable, num_total_tasks] {
this->threadTask(runnable, num_total_tasks);
});
}
for(int i=0; i < num_threads_; i++) {
workers[i].join();
}
}
===================================================================================
(base) ➜ part_a git:(master) ✗ python3 ../tests/run_test_harness.py -n 8 -t super_light super_super_light
runtasks_ref
Linux x86_64
================================================================================
Running task system grading harness... (2 total tests)
- Detected CPU with 40 execution contexts
- Task system configured to use at most 8 threads
================================================================================
================================================================================
Executing test: super_super_light...
Reference binary: ./runtasks_ref_linux
Results for: super_super_light
STUDENT REFERENCE PERF?
[Serial] 8.562 12.451 0.69 (OK)
[Parallel + Always Spawn] 56.804 54.287 1.05 (OK)
[Parallel + Thread Pool + Spin] 8.532 25.417 0.34 (OK)
[Parallel + Thread Pool + Sleep] 8.456 24.996 0.34 (OK)
================================================================================
Executing test: super_light...
Reference binary: ./runtasks_ref_linux
Results for: super_light
STUDENT REFERENCE PERF?
[Serial] 81.044 95.194 0.85 (OK)
[Parallel + Always Spawn] 55.138 83.604 0.66 (OK)
[Parallel + Thread Pool + Spin] 80.753 27.388 2.95 (NOT OK)
[Parallel + Thread Pool + Sleep] 80.646 33.46 2.41 (NOT OK)
================================================================================
Overall performance results
[Serial] : All passed Perf
[Parallel + Always Spawn] : All passed Perf
[Parallel + Thread Pool + Spin] : Perf did not pass all tests
[Parallel + Thread Pool + Sleep] : Perf did not pass all tests
Step2 : threadpool + spinning waiting
void TaskSystemParallelThreadPoolSpinning::threadTask() {
// bool done = done_.load();
while(!done_.load()) {
// printf("id: %d\n", id);
mutex_.lock();
threadArg arg;
if(!task_queue_.empty()) {
arg = task_queue_.front();
task_queue_.pop_front();
}
mutex_.unlock();
if(arg.runnable) {
arg.runnable->runTask( arg.task_id, arg.num_total_tasks);
finished_tasks_.fetch_add(1);
}
}
}
TaskSystemParallelThreadPoolSpinning::TaskSystemParallelThreadPoolSpinning(int num_threads): ITaskSystem(num_threads) {
//
// TODO: CS149 student implementations may decide to perform setup
// operations (such as thread pool construction) here.
// Implementations are free to add new class member variables
// (requiring changes to tasksys.h).
//
done_ = false;
num_threads_ = num_threads;
for(int i=0; i < num_threads_; i++) {
workers_.emplace_back([this ]{
this->threadTask();
});
}
}
TaskSystemParallelThreadPoolSpinning::~TaskSystemParallelThreadPoolSpinning() {
done_.store(true);
for(int i=0; i< num_threads_; i++) {
workers_[i].join();
}
}
void TaskSystemParallelThreadPoolSpinning::run(IRunnable* runnable, int num_total_tasks) {
//
// TODO: CS149 students will modify the implementation of this
// method in Part A. The implementation provided below runs all
// tasks sequentially on the calling thread.
//
finished_tasks_ = 0;
{
const std::lock_guard<std::mutex> lock(mutex_);
for(int i=0; i < num_total_tasks; i++) {
threadArg arg(i, num_total_tasks, runnable);
task_queue_.emplace_back(arg);
}
}
while(finished_tasks_.load() != num_total_tasks) {
}
}
(base) ➜ part_a git:(master) ✗ python3 ../tests/run_test_harness.py -n 8
runtasks_ref
Linux x86_64
================================================================================ Running task system grading harness... (11 total tests)
- Detected CPU with 40 execution contexts
- Task system configured to use at most 8 threads
================================================================================
================================================================================ Executing test: super_super_light...
Reference binary: ./runtasks_ref_linux
Results for: super_super_light
STUDENT REFERENCE PERF?
[Serial] 8.574 12.343 0.69 (OK)
[Parallel + Always Spawn] 56.938 54.949 1.04 (OK)
[Parallel + Thread Pool + Spin] 21.404 25.716 0.83 (OK)
[Parallel + Thread Pool + Sleep] 8.704 17.781 0.49 (OK)
================================================================================
Executing test: super_light...
Reference binary: ./runtasks_ref_linux
Results for: super_light
STUDENT REFERENCE PERF?
[Serial] 80.252 94.107 0.85 (OK)
[Parallel + Always Spawn] 73.338 83.35 0.88 (OK)
[Parallel + Thread Pool + Spin] 27.698 36.119 0.77 (OK)
[Parallel + Thread Pool + Sleep] 80.795 34.28 2.36 (NOT OK)
================================================================================
Executing test: ping_pong_equal...
Reference binary: ./runtasks_ref_linux
Results for: ping_pong_equal
STUDENT REFERENCE PERF?
[Serial] 1297.763 1553.482 0.84 (OK)
[Parallel + Always Spawn] 574.729 649.785 0.88 (OK)
[Parallel + Thread Pool + Spin] 187.65 232.88 0.81 (OK)
[Parallel + Thread Pool + Sleep] 1294.887 226.363 5.72 (NOT OK)
================================================================================
Executing test: ping_pong_unequal...
Reference binary: ./runtasks_ref_linux
Results for: ping_pong_unequal
STUDENT REFERENCE PERF?
[Serial] 2405.604 2401.768 1.00 (OK)
[Parallel + Always Spawn] 969.55 1021.323 0.95 (OK)
[Parallel + Thread Pool + Spin] 336.518 332.326 1.01 (OK)
[Parallel + Thread Pool + Sleep] 2396.002 338.008 7.09 (NOT OK)
================================================================================
Executing test: recursive_fibonacci...
Reference binary: ./runtasks_ref_linux
Results for: recursive_fibonacci
STUDENT REFERENCE PERF?
[Serial] 1578.621 1938.967 0.81 (OK)
[Parallel + Always Spawn] 482.274 548.912 0.88 (OK)
[Parallel + Thread Pool + Spin] 222.252 266.043 0.84 (OK)
[Parallel + Thread Pool + Sleep] 1583.164 266.454 5.94 (NOT OK)
================================================================================
Executing test: math_operations_in_tight_for_loop...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop
STUDENT REFERENCE PERF?
[Serial] 808.052 833.969 0.97 (OK)
[Parallel + Always Spawn] 651.691 705.552 0.92 (OK)
[Parallel + Thread Pool + Spin] 139.645 154.966 0.90 (OK)
[Parallel + Thread Pool + Sleep] 802.654 157.355 5.10 (NOT OK)
================================================================================
================================================================================
Executing test: math_operations_in_tight_for_loop_fewer_tasks...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_fewer_tasks
STUDENT REFERENCE PERF?
[Serial] 806.914 839.226 0.96 (OK)
[Parallel + Always Spawn] 809.429 810.212 1.00 (OK)
[Parallel + Thread Pool + Spin] 215.631 238.903 0.90 (OK)
[Parallel + Thread Pool + Sleep] 805.115 244.607 3.29 (NOT OK)
================================================================================
Executing test: math_operations_in_tight_for_loop_fan_in...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_fan_in
STUDENT REFERENCE PERF?
[Serial] 416.313 428.492 0.97 (OK)
[Parallel + Always Spawn] 133.057 160.116 0.83 (OK)
[Parallel + Thread Pool + Spin] 71.286 74.03 0.96 (OK)
[Parallel + Thread Pool + Sleep] 412.149 81.239 5.07 (NOT OK)
================================================================================
Executing test: math_operations_in_tight_for_loop_reduction_tree...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_reduction_tree
STUDENT REFERENCE PERF?
[Serial] 415.842 428.485 0.97 (OK)
[Parallel + Always Spawn] 142.34 191.563 0.74 (OK)
[Parallel + Thread Pool + Spin] 70.203 65.066 1.08 (OK)
[Parallel + Thread Pool + Sleep] 411.849 67.231 6.13 (NOT OK)
================================================================================
Executing test: spin_between_run_calls...
Reference binary: ./runtasks_ref_linux
Results for: spin_between_run_calls
STUDENT REFERENCE PERF?
[Serial] 540.34 687.538 0.79 (OK)
[Parallel + Always Spawn] 290.111 364.247 0.80 (OK)
[Parallel + Thread Pool + Spin] 295.734 373.543 0.79 (OK)
[Parallel + Thread Pool + Sleep] 538.037 364.643 1.48 (NOT OK)
================================================================================
Executing test: mandelbrot_chunked...
Reference binary: ./runtasks_ref_linux
Results for: mandelbrot_chunked
STUDENT REFERENCE PERF?
[Serial] 526.695 537.581 0.98 (OK)
[Parallel + Always Spawn] 79.414 78.909 1.01 (OK)
[Parallel + Thread Pool + Spin] 83.16 77.74 1.07 (OK)
[Parallel + Thread Pool + Sleep] 530.464 85.143 6.23 (NOT OK)
================================================================================
Overall performance results
[Serial] : All passed Perf
[Parallel + Always Spawn] : All passed Perf
[Parallel + Thread Pool + Spin] : All passed Perf
[Parallel + Thread Pool + Sleep] : Perf did not pass all tests
Step3: Threadpool + put threads to sleep when there is nothing to do
Implementation :
Notice that we have to test if each thread is joinable and call join()
at the destructor of the class.
Because each thread might have already exited when done_
is set to true
void TaskSystemParallelThreadPoolSleeping::threadTask( ) {
int num_task = 0;
while(!done_.load()) {
std::unique_lock<std::mutex> lock(mutex_);
threadArg arg ;
bool get_task = false;
if(!task_queue_.empty()) {
arg = task_queue_.front();
task_queue_.pop_front();
get_task = true;
// printf("get task\n");
} else {
cv_.wait(lock);
}
lock.unlock();
if(get_task) {
arg.runnable->runTask(arg.task_id, arg.num_total_tasks);
num_task = arg.num_total_tasks;
finished_tasks_.fetch_add(1);
if(finished_tasks_.load() == num_task) {
// printf("send notify\n");
cv_.notify_all();
}
}
}
}
TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(int num_threads): ITaskSystem(num_threads) {
//
// TODO: CS149 student implementations may decide to perform setup
// operations (such as thread pool construction) here.
// Implementations are free to add new class member variables
// (requiring changes to tasksys.h).
//
num_threads_ = num_threads;
done_ = false;
for(int i=0; i< num_threads_; i++){
workers_.emplace_back([this]{
this->threadTask();
});
}
}
TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
//
// TODO: CS149 student implementations may decide to perform cleanup
// operations (such as thread pool shutdown construction) here.
// Implementations are free to add new class member variables
// (requiring changes to tasksys.h).
//
done_.store(true);
cv_.notify_all();
for(int i=0; i < num_threads_; i++) {
if(workers_[i].joinable()) {
workers_[i].join();
}
}
}
void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {
//
// TODO: CS149 students will modify the implementation of this
// method in Parts A and B. The implementation provided below runs all
// tasks sequentially on the calling thread.
//
std::unique_lock<std::mutex> lock(mutex_);
finished_tasks_.store(0) ;
for(int i=0; i < num_total_tasks; i++) {
threadArg arg(i, num_total_tasks, runnable);
task_queue_.emplace_back(arg);
}
// printf("before notify all\n");
lock.unlock();
cv_.notify_all();
lock.lock();
while(finished_tasks_.load() != num_total_tasks) {
cv_.wait(lock);
}
// printf("finished_tasks_:%d\n", finished_tasks_.load());
}
Part B: threadpoll + async call w
This line of code is important.
We have two conditions to check for cv_.wait()
. cv_.wait()
will check the condition each time it wakes up and it would stop waiting when the condition is true.
This can help save time for further unncessary condition checking in the while loop.
cv_.wait(lock, [this]{return !ready_queue_.empty() || done_ ;});
Another change I make compared to previous version of code is that I use bool
intead of std::atomic<bool>
to store done_
variable which indicates whether the class is destructed.
done_
variable is set to true without lock protection in destructor if its type is std::atomic<bool>
which means that thread might miss the notify_all()
call from destructor and those worker threads would never be woke up and exit .
After change done_
to type bool
, we are guaranteed that all threads would receive the notify_all()
call because the destructor holds the lock first and then set done_
to true
and then notify_all();
The guarantees the order of instructions.
Issue: Test can not pass with 4 threads.
Investigation is ongoing.
TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
//
// TODO: CS149 student implementations may decide to perform cleanup
// operations (such as thread pool shutdown construction) here.
// Implementations are free to add new class member variables
// (requiring changes to tasksys.h).
//
std::unique_lock<std::mutex> lock(mutex_);
done_=true;
lock.unlock();
cv_.notify_all();
printf("destructor\n");
for(int i=0; i < workers_.size(); i++) {
// cv_.notify_all();
if(workers_[i].joinable()) {
workers_[i].join();
}
}
printf("exit\n");
}
Worker code:
TaskID TaskSystemParallelThreadPoolSleeping::runAsyncWithDeps(IRunnable* runnable, int num_total_tasks,
const std::vector<TaskID>& deps) {
//
// TODO: CS149 students will implement this method in Part B.
//
TaskID new_task = task_dep_.GetNewTaskID();
mutex_.lock();
taskid_remain_work_count_map_[new_task] = num_total_tasks;
for(TaskID parent_task_id: deps) {
dependent_children_[parent_task_id].insert(new_task);
}
TaskArg task_arg(new_task, runnable, num_total_tasks, deps.size());
printf("task: %d dep count: %ld\n", new_task, deps.size());
task_id_remaining_dependent_parents_count_[new_task] = task_arg;
CheckAndNotify(new_task);
mutex_.unlock();
return new_task;
}
void TaskSystemParallelThreadPoolSleeping::threadTask( ) {
std::unique_lock<std::mutex> lock(mutex_);
while(!done_) {
threadArg arg ;
bool get_task = false;
if(!ready_queue_.empty()) {
arg = ready_queue_.front();
ready_queue_.pop_front();
get_task = true;
printf("get task\n");
} else {
printf("wait task\n");
cv_.wait(lock);
// cv_.wait(lock, [this]{return !ready_queue_.empty() || done_ ;});
continue;
}
lock.unlock();
if(get_task) {
int task_thread_id = arg.task_thread_id;
int task_id = arg.task_id;
arg.runnable->runTask(task_thread_id, arg.num_total_tasks);
bool cur_task_finished = false;
{
std::unique_lock<std::mutex> inner_lock(mutex_);
int task_remain_work_count = --taskid_remain_work_count_map_[task_id];
if(task_remain_work_count == 0) {
printf("task:%d finished\n", task_id);
cur_task_finished = true;
const auto& children_task_ids = dependent_children_[task_id];
for(TaskID child_task_id: children_task_ids ) {
task_id_remaining_dependent_parents_count_[child_task_id].remain_dep_parent_task_count--;
CheckAndMoveTaskToReadyQueue(child_task_id);
}
}
}
if(cur_task_finished) {
finished_tasks_.fetch_add(1);
// extra notify all here for sync()
cv_.notify_all();
}
}
lock.lock();
}
}
Add another condition to notify_all() threads.
if(cur_task_finished || moved_to_ready_queue) {
finished_tasks_.fetch_add(1);
// extra notify all here for sync()
cv_.notify_all();
}
Task 1 is not moved to ready_queue
destructor
exit
wait task
wait task
task:0 moved to ready_queue
get task
get task
get task
wait task
task:0 finished
wait task
Another issue: the deps parent task might have been finished when the new child task is pushed. Need to deal with this situation.
So we should not use this structure for task schedule
std::unordered_map<TaskID, std::unordered_set<TaskID>> dependent_children_;
This class structure is not right.
class TaskSystemParallelThreadPoolSleeping: public ITaskSystem {
public:
TaskSystemParallelThreadPoolSleeping(int num_threads);
~TaskSystemParallelThreadPoolSleeping();
const char* name();
void run(IRunnable* runnable, int num_total_tasks);
TaskID runAsyncWithDeps(IRunnable* runnable, int num_total_tasks,
const std::vector<TaskID>& deps);
void sync();
private:
void threadTask( );
bool CheckAndMoveTaskToReadyQueue(TaskID child_task_id);
void CheckAndNotify(TaskID child_task_id);
std::vector<std::thread> workers_;
int num_threads_;
bool done_;
std::mutex mutex_;
std::condition_variable cv_;
std::atomic<TaskID> task_id_;
std::unordered_map<TaskID, std::unordered_set<TaskID>> dependent_children_;
std::unordered_map<TaskID, TaskArg> task_id_remaining_dependent_parents_count_;
std::deque<threadArg> ready_queue_;
std::unordered_map<TaskID, int> taskid_remain_work_count_map_;
std::atomic<int> finished_tasks_;
};
Correct implementaion
Code :
void TaskSystemParallelThreadPoolSleeping::threadTask( ) {
std::unique_lock<std::mutex> lock(mutex_);
std::vector<TaskID> tasks;
bool get_new_tasks_to_run = false;
// get_new_tasks_to_run = task_dep_.CheckAndGetRunnableTasks(tasks);
// for(TaskID task_id : tasks){
// ScheduleTaskToReadyQueue(task_id);
// }
// tasks.clear();
while(!done_ || !ready_queue_.empty() || task_dep_.HasTaskToSchedule()) {
get_new_tasks_to_run = task_dep_.CheckAndGetRunnableTasks(tasks);
for(TaskID task_id: tasks) {
ScheduleTaskToReadyQueue(task_id);
}
tasks.clear();
threadArg arg ;
bool get_task = false;
if(!ready_queue_.empty()) {
arg = ready_queue_.front();
ready_queue_.pop_front();
get_task = true;
// printf("get task\n");
} else {
// printf("wait task\n");
cv_.wait(lock);
// cv_.wait(lock, [this]{return !ready_queue_.empty() || done_ ;});
}
lock.unlock();
if(get_task) {
int task_thread_id = arg.task_thread_id;
int task_id = arg.task_id;
arg.runnable->runTask(task_thread_id, arg.num_total_tasks);
{
std::unique_lock<std::mutex> inner_lock(mutex_);
// printf("taskid: %d, thread_id: %d finished\n", task_id, task_thread_id);
FinishOneSubTask(task_id);
}
// Move notify_all out of mutex scope so that
// other threads can wake up and do some work
// if(cur_task_finished || has_moved_to_ready_queue) {
// cv_.notify_all();
// }
cv_.notify_all();
}
lock.lock();
}
}
// Prerequisite: mutex_ is held
void TaskSystemParallelThreadPoolSleeping::ScheduleTaskToReadyQueue(TaskID task_id) {
TaskArg task_arg;
task_dep_.GetTaskArg(task_id, task_arg);
for(int i=0; i < task_arg.num_total_tasks; i++) {
threadArg ta(i, task_arg.num_total_tasks, task_arg.runnable, task_id);
taskid_remain_work_count_map_[task_id] = task_arg.num_total_tasks;
ready_queue_.emplace_back(ta);
}
}
const char* TaskSystemParallelThreadPoolSleeping::name() {
return "Parallel + Thread Pool + Sleep";
}
TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(int num_threads): ITaskSystem(num_threads) {
//
// TODO: CS149 student implementations may decide to perform setup
// operations (such as thread pool construction) here.
// Implementations are free to add new class member variables
// (requiring changes to tasksys.h).
//
done_ = false;
// task_id_ = 0;
num_threads_ = num_threads;
// finished_tasks_ = 0;
for(int i=0; i < num_threads_; i++) {
workers_.emplace_back([this]{
this->threadTask();
});
}
}
TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
//
// TODO: CS149 student implementations may decide to perform cleanup
// operations (such as thread pool shutdown construction) here.
// Implementations are free to add new class member variables
// (requiring changes to tasksys.h).
//
std::unique_lock<std::mutex> lock(mutex_);
done_=true;
lock.unlock();
cv_.notify_all();
// printf("destructor\n");
for(int i=0; i < workers_.size(); i++) {
// cv_.notify_all();
if(workers_[i].joinable()) {
workers_[i].join();
}
}
// printf("exit\n");
}
void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {
//
// TODO: CS149 students will modify the implementation of this
// method in Parts A and B. The implementation provided below runs all
// tasks sequentially on the calling thread.
//
for (int i = 0; i < num_total_tasks; i++) {
runnable->runTask(i, num_total_tasks);
}
}
// Two places to shcedule tasks and put tasks to ready_queue_ directly
// or move tasks in task_queue_ to ready_queue_ at the start of runAsyncWithDeps call
// or at threadTask where each TaskID is done
TaskID TaskSystemParallelThreadPoolSleeping::runAsyncWithDeps(IRunnable* runnable, int num_total_tasks,
const std::vector<TaskID>& deps) {
//
// TODO: CS149 students will implement this method in Part B.
//
mutex_.lock();
TaskID new_task = task_dep_.GetNewTaskID();
task_dep_.AddTaskDep(new_task, deps, runnable, num_total_tasks);
// printf("task: %d dep count: %ld\n", new_task, deps.size());
mutex_.unlock();
cv_.notify_all();
return new_task;
}
void TaskSystemParallelThreadPoolSleeping::sync() {
//
// TODO: CS149 students will modify the implementation of this method in Part B.
//
std::unique_lock<std::mutex> lock(mutex_);
// In case all threads atre put to sleep
while(!ready_queue_.empty() || task_dep_.GetFinishedTaskCount() != task_dep_.GetTotalTaskCount() ) {
// printf("finished task count:%d, task_id:%d\n", finished_tasks_.load(), task_id_.load());
cv_.wait(lock);
// printf("finished task count:%d, task_id:%d\n", finished_tasks_.load(), task_id_.load());
}
return;
}
TaskDep::TaskDep() {
task_id_ = 0;
finished_tasks_ = 0;
}
TaskID TaskDep::GetNewTaskID() {
return task_id_.fetch_add(1);
}
int TaskDep::GetFinishedTaskCount() const {
return finished_tasks_.load();
}
void TaskDep::FinishTask(TaskID task_id) {
finished_task_ids_.insert(task_id);
finished_tasks_.fetch_add(1);
}
void TaskSystemParallelThreadPoolSleeping::FinishOneSubTask(TaskID task_id) {
if(--taskid_remain_work_count_map_[task_id] ==0 ) {
// printf("task:%d finished\n", task_id);
task_dep_.FinishTask(task_id);
}
}
bool TaskDep::CheckAndGetRunnableTasks(std::vector<TaskID>& tasks) {
std::vector<TaskID> to_erase;
for(auto &iter: dependent_parents_) {
TaskID child_task_id = iter.first;
// printf("child task id:%d\n", child_task_id );
for(auto parent_it=iter.second.begin(); parent_it!=iter.second.end();){
// printf(" parent: %d, finished:%d\n", *parent_it, CheckTaskFinished(*parent_it));
if( CheckTaskFinished(*parent_it)){
parent_it = iter.second.erase(parent_it);
} else {
parent_it++;
}
}
bool not_scheduled = scheduled_task_ids_.find(child_task_id) == scheduled_task_ids_.end() ;
bool parents_empty =dependent_parents_[child_task_id].empty();
if(not_scheduled && parents_empty){
scheduled_task_ids_.insert(child_task_id);
// printf("new task schedule:%d\n", child_task_id);
tasks.emplace_back(child_task_id);
to_erase.emplace_back(child_task_id);
}
}
for(TaskID erase_id: to_erase) {
dependent_parents_.erase(erase_id);
}
return !tasks.empty();
}
bool TaskDep::CheckTaskFinished(TaskID task_id) {
return finished_task_ids_.find(task_id) != finished_task_ids_.end();
}
void TaskDep::GetTaskArg(TaskID task_id, TaskArg& task_arg) {
const auto& ta = task_args_[task_id];
task_arg.task_id = ta.task_id;
task_arg.num_total_tasks = ta.num_total_tasks;
task_arg.runnable = ta.runnable;
task_arg.remain_dep_parent_task_count = ta.remain_dep_parent_task_count;
}
void TaskDep::AddTaskDep(TaskID child, std::vector<TaskID> deps, IRunnable* runnable, int num_total_tasks) {
TaskArg ta;
ta.task_id = child;
ta.remain_dep_parent_task_count = deps.size();
ta.runnable = runnable;
ta.num_total_tasks = num_total_tasks;
if(!deps.empty()) {
for(TaskID task_id: deps) {
dependent_parents_[child].insert(task_id);
}
} else {
dependent_parents_[child] = {};
}
task_args_[child] = ta;
}
Notice the difference between code below
The first piece of code would not work because it creates a copy of the elements of dependent_parents_
so dependent_parents_[child_task_id].empty()
would always return false
for(auto iter: dependent_parents_) {
if(scheduled_task_ids_.find(child_task_id) == scheduled_task_ids_.end() &&dependent_parents_[child_task_id].empty()){
scheduled_task_ids_.insert(child_task_id);
printf("new task schedule:%d\n", child_task_id);
tasks.emplace_back(child_task_id);
to_erase.emplace_back(child_task_id);
// dependent_parents_.erase(child_task_id);
}
for(auto &iter: dependent_parents_) {
if(scheduled_task_ids_.find(child_task_id) == scheduled_task_ids_.end() &&dependent_parents_[child_task_id].empty()){
scheduled_task_ids_.insert(child_task_id);
printf("new task schedule:%d\n", child_task_id);
tasks.emplace_back(child_task_id);
to_erase.emplace_back(child_task_id);
// dependent_parents_.erase(child_task_id);
}
Result:
Not good. I think it’s because I wake up threads frequently.
Try to reduce wake up time.
Executing test: super_super_light_async...
Reference binary: ./runtasks_ref_linux
Results for: super_super_light_async
STUDENT REFERENCE PERF?
[Serial] 8.706 12.433 0.70 (OK)
[Parallel + Always Spawn] 8.406 56.047 0.15 (OK)
[Parallel + Thread Pool + Spin] 8.27 25.412 0.33 (OK)
[Parallel + Thread Pool + Sleep] 224.619 24.681 9.10 (NOT OK)
================================================================================
Executing test: super_light_async...
Reference binary: ./runtasks_ref_linux
Results for: super_light_async
STUDENT REFERENCE PERF?
[Serial] 82.485 95.578 0.86 (OK)
[Parallel + Always Spawn] 81.039 81.133 1.00 (OK)
[Parallel + Thread Pool + Spin] 81.268 34.526 2.35 (NOT OK)
[Parallel + Thread Pool + Sleep] 408.665 31.755 12.87 (NOT OK)
================================================================================
Executing test: ping_pong_equal_async...
Reference binary: ./runtasks_ref_linux
Results for: ping_pong_equal_async
STUDENT REFERENCE PERF?
[Serial] 1313.035 1542.283 0.85 (OK)
[Parallel + Always Spawn] 1309.708 701.653 1.87 (NOT OK)
[Parallel + Thread Pool + Spin] 1313.704 227.62 5.77 (NOT OK)
[Parallel + Thread Pool + Sleep] 778.308 224.776 3.46 (NOT OK)
================================================================================
Executing test: ping_pong_unequal_async...
Reference binary: ./runtasks_ref_linux
Results for: ping_pong_unequal_async
STUDENT REFERENCE PERF?
[Serial] 2446.194 2402.307 1.02 (OK)
[Parallel + Always Spawn] 2456.503 976.121 2.52 (NOT OK)
[Parallel + Thread Pool + Spin] 2454.091 332.344 7.38 (NOT OK)
[Parallel + Thread Pool + Sleep] 506.64 330.938 1.53 (NOT OK)
================================================================================
Executing test: recursive_fibonacci_async...
Reference binary: ./runtasks_ref_linux
Results for: recursive_fibonacci_async
STUDENT REFERENCE PERF?
[Serial] 1310.101 1946.319 0.67 (OK)
[Parallel + Always Spawn] 1303.736 505.253 2.58 (NOT OK)
[Parallel + Thread Pool + Spin] 1306.294 261.702 4.99 (NOT OK)
[Parallel + Thread Pool + Sleep] 186.681 264.498 0.71 (OK)
================================================================================
Executing test: math_operations_in_tight_for_loop_async...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_async
STUDENT REFERENCE PERF?
[Serial] 821.415 854.457 0.96 (OK)
[Parallel + Always Spawn] 822.081 697.755 1.18 (OK)
[Parallel + Thread Pool + Spin] 817.92 149.114 5.49 (NOT OK)
[Parallel + Thread Pool + Sleep] 7622.159 148.934 51.18 (NOT OK)
================================================================================
Executing test: math_operations_in_tight_for_loop_reduction_tree_async...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_reduction_tree_async
STUDENT REFERENCE PERF?
[Serial] 419.026 439.384 0.95 (OK)
[Parallel + Always Spawn] 418.167 141.737 2.95 (NOT OK)
[Parallel + Thread Pool + Spin] 418.431 72.123 5.80 (NOT OK)
[Parallel + Thread Pool + Sleep] 70.724 71.191 0.99 (OK)
================================================================================
Reduce run time
Another change I want to make is that I can only schedule tasks at the begining of run call or when a task is finished. This means that there will be less check to get runnable task.
Because there are only places where task is scheduled and they are
- The task is first inserted to system
- Another task is finished.
Two possible state of parent task.
-
The parent task is finished. For this, we can erase the task dependency for child task when the child task is submitted.
-
The parent task is not finished. For this, when parent task is finished, it will talk with task manager and tell it to erase parent task from taske dependency structure.
For both situation, check if child task can be scheduled.
Code:
TaskDep::TaskDep() {
task_id_ = 0;
finished_tasks_ = 0;
}
TaskID TaskDep::GetNewTaskID() {
return task_id_.fetch_add(1);
}
int TaskDep::GetFinishedTaskCount() const {
return finished_tasks_.load();
}
void TaskDep::FinishTask(TaskID task_id, std::vector<TaskID>& children_tasks) {
finished_task_ids_.insert(task_id);
finished_tasks_.fetch_add(1);
// remove this parent from all of its children.
for(TaskID child_task_id: dependent_children_[task_id] ) {
dependent_parents_[child_task_id].erase(task_id);
// If child has no parent dependency , we change schedule it
if(dependent_parents_[child_task_id].empty()) {
children_tasks.emplace_back(child_task_id);
}
// if(dependent_children_[child_task_id].find(task_id) != dependent_children_[child_task_id].end()) {
// dependent_children_[child_task_id].erase(task_id);
// }
// if(dependent_parents_[child_task_id].empty()) {
// children_tasks.emplace_back(child_task_id);
// }
}
}
bool TaskSystemParallelThreadPoolSleeping::FinishOneSubTask(TaskID task_id, std::vector<TaskID>& children_tasks) {
if(--taskid_remain_work_count_map_[task_id] ==0 ) {
// printf("task:%d finished\n", task_id);
task_dep_.FinishTask(task_id, children_tasks);
return true;
}
return false;
}
bool TaskDep::CheckAndGetRunnableTasks(std::vector<TaskID>& tasks) {
std::vector<TaskID> to_erase;
for(auto &iter: dependent_parents_) {
TaskID child_task_id = iter.first;
// printf("child task id:%d\n", child_task_id );
for(auto parent_it=iter.second.begin(); parent_it!=iter.second.end();){
// printf(" parent: %d, finished:%d\n", *parent_it, CheckTaskFinished(*parent_it));
if( CheckTaskFinished(*parent_it)){
parent_it = iter.second.erase(parent_it);
} else {
parent_it++;
}
}
bool not_scheduled = scheduled_task_ids_.find(child_task_id) == scheduled_task_ids_.end() ;
bool parents_empty =dependent_parents_[child_task_id].empty();
if(not_scheduled && parents_empty){
scheduled_task_ids_.insert(child_task_id);
// printf("new task schedule:%d\n", child_task_id);
tasks.emplace_back(child_task_id);
to_erase.emplace_back(child_task_id);
}
}
for(TaskID erase_id: to_erase) {
dependent_parents_.erase(erase_id);
}
return !tasks.empty();
}
bool TaskDep::CheckTaskFinished(TaskID task_id) {
return finished_task_ids_.find(task_id) != finished_task_ids_.end();
}
void TaskDep::GetTaskArg(TaskID task_id, TaskArg& task_arg) {
const auto& ta = task_args_[task_id];
task_arg.task_id = ta.task_id;
task_arg.num_total_tasks = ta.num_total_tasks;
task_arg.runnable = ta.runnable;
task_arg.remain_dep_parent_task_count = ta.remain_dep_parent_task_count;
}
void TaskDep::AddTaskDep(TaskID child, std::vector<TaskID> deps, IRunnable* runnable, int num_total_tasks) {
TaskArg ta;
ta.task_id = child;
ta.remain_dep_parent_task_count = deps.size();
ta.runnable = runnable;
ta.num_total_tasks = num_total_tasks;
if(!deps.empty()) {
for(TaskID task_id: deps) {
dependent_parents_[child].insert(task_id);
dependent_children_[task_id].insert(child);
}
} else {
dependent_parents_[child] = {};
}
task_args_[child] = ta;
}
void TaskSystemParallelThreadPoolSleeping::threadTask( ) {
std::unique_lock<std::mutex> lock(mutex_);
std::vector<TaskID> tasks;
bool get_new_tasks_to_run = false;
while(!done_ || !ready_queue_.empty() ) {
threadArg arg ;
bool get_task = false;
if(!ready_queue_.empty()) {
arg = ready_queue_.front();
ready_queue_.pop_front();
get_task = true;
// printf("get task\n");
} else {
// printf("wait task\n");
cv_.wait(lock);
// cv_.wait(lock, [this]{return !ready_queue_.empty() || done_ ;});
}
lock.unlock();
if(get_task) {
int task_thread_id = arg.task_thread_id;
int task_id = arg.task_id;
bool cur_task_finished = false;
std::vector<TaskID> children_task_to_schedule;
arg.runnable->runTask(task_thread_id, arg.num_total_tasks);
{
std::unique_lock<std::mutex> inner_lock(mutex_);
// printf("taskid: %d, thread_id: %d finished\n", task_id, task_thread_id);
cur_task_finished = FinishOneSubTask(task_id, children_task_to_schedule);
if(!children_task_to_schedule.empty()) {
for(TaskID child_task_id: children_task_to_schedule){
ScheduleTaskToReadyQueue(child_task_id);
// Is sthis ok ?
// cv_.notify_all();
}
}
}
// Move notify_all out of mutex scope so that
// other threads can wake up and do some work
// if(!ready_queue_.empty() ) {
// }
if(!children_task_to_schedule.empty() || wait_sync_.load()) {
cv_.notify_all();
}
}
lock.lock();
}
}
// bool TaskSystemParallelThreadPoolSleeping::CheckTaskAndMoveToReadyQueue() {
// std::vector<TaskID> tasks;
// bool get_new_tasks_to_run = task_dep_.CheckAndGetRunnableTasks(tasks);
// for(TaskID task_id: tasks) {
// ScheduleTaskToReadyQueue(task_id);
// }
// // tasks.clear();
// return get_new_tasks_to_run;
// }
// Prerequisite: mutex_ is held
void TaskSystemParallelThreadPoolSleeping::ScheduleTaskToReadyQueue(TaskID task_id) {
TaskArg task_arg;
task_dep_.GetTaskArg(task_id, task_arg);
bool not_scheduled = scheduled_task_ids_.find(task_id) == scheduled_task_ids_.end() ;
if(not_scheduled ){
scheduled_task_ids_.insert(task_id);
// printf("new task schedule:%d\n", child_task_id);
// to_erase.emplace_back(child_task_id);
}
for(int i=0; i < task_arg.num_total_tasks; i++) {
threadArg ta(i, task_arg.num_total_tasks, task_arg.runnable, task_id);
taskid_remain_work_count_map_[task_id] = task_arg.num_total_tasks;
ready_queue_.emplace_back(ta);
}
}
const char* TaskSystemParallelThreadPoolSleeping::name() {
return "Parallel + Thread Pool + Sleep";
}
TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(int num_threads): ITaskSystem(num_threads) {
//
// TODO: CS149 student implementations may decide to perform setup
// operations (such as thread pool construction) here.
// Implementations are free to add new class member variables
// (requiring changes to tasksys.h).
//
done_ = false;
// task_id_ = 0;
num_threads_ = num_threads;
// finished_tasks_ = 0;
for(int i=0; i < num_threads_; i++) {
workers_.emplace_back([this]{
this->threadTask();
});
}
}
TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
//
// TODO: CS149 student implementations may decide to perform cleanup
// operations (such as thread pool shutdown construction) here.
// Implementations are free to add new class member variables
// (requiring changes to tasksys.h).
//
std::unique_lock<std::mutex> lock(mutex_);
done_=true;
lock.unlock();
cv_.notify_all();
// printf("destructor\n");
for(int i=0; i < workers_.size(); i++) {
// cv_.notify_all();
if(workers_[i].joinable()) {
workers_[i].join();
}
}
// printf("exit\n");
}
void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {
//
// TODO: CS149 students will modify the implementation of this
// method in Parts A and B. The implementation provided below runs all
// tasks sequentially on the calling thread.
//
for (int i = 0; i < num_total_tasks; i++) {
runnable->runTask(i, num_total_tasks);
}
}
// Two places to shcedule tasks and put tasks to ready_queue_ directly
// or move tasks in task_queue_ to ready_queue_ at the start of runAsyncWithDeps call
// or at threadTask where each TaskID is done
TaskID TaskSystemParallelThreadPoolSleeping::runAsyncWithDeps(IRunnable* runnable, int num_total_tasks,
const std::vector<TaskID>& deps) {
//
// TODO: CS149 students will implement this method in Part B.
//
mutex_.lock();
TaskID new_task = task_dep_.GetNewTaskID();
printf("task: %d dep count: %ld, dep id:%d\n", new_task, deps.size(), deps.empty() ? 0 : deps[0]);
// CheckTaskAndMoveToReadyQueue();
std::vector<TaskID> new_deps;
for(TaskID parent_task_id: deps) {
if(!task_dep_.CheckTaskFinished(parent_task_id)) {
new_deps.emplace_back(parent_task_id);
}
}
task_dep_.AddTaskDep(new_task, new_deps, runnable, num_total_tasks);
bool scheduled = false;
if(new_deps.empty()) {
ScheduleTaskToReadyQueue(new_task);
scheduled = true;
}
mutex_.unlock();
if(scheduled) {
cv_.notify_all();
}
return new_task;
}
void TaskSystemParallelThreadPoolSleeping::sync() {
//
// TODO: CS149 students will modify the implementation of this method in Part B.
//
std::unique_lock<std::mutex> lock(mutex_);
// In case all threads atre put to sleep
wait_sync_.store(true);
while(!ready_queue_.empty() || task_dep_.GetFinishedTaskCount() != task_dep_.GetTotalTaskCount() ) {
// printf("finished task count:%d, task_id:%d\n", finished_tasks_.load(), task_id_.load());
cv_.wait(lock);
// printf("finished task count:%d, task_id:%d\n", task_dep_.GetFinishedTaskCount(), task_dep_.GetTotalTaskCount());
}
wait_sync_.store(false);
return;
}
class TaskDep{
public:
TaskDep();
void AddTaskDep(TaskID child, std::vector<TaskID> deps, IRunnable* runnable, int num_total_tasks);
// Who is responsible for call this function?
// I guess we should just iterate all task_id to see
// if we can schedule them.
// void CheckAndRemoveDep(TaskID child_task_id);
void FinishTask(TaskID task_id, std::vector<TaskID>& children_tasks);
TaskID GetNewTaskID();
int GetFinishedTaskCount() const ;
int GetTotalTaskCount() const { return task_id_.load();}
bool CheckTaskFinished(TaskID task_id);
bool CheckAndGetRunnableTasks(std::vector<TaskID>& tasks);
bool HasTaskToSchedule() const { return !dependent_parents_.empty();}
void GetTaskArg(TaskID task_id, TaskArg& task_arg);
private:
std::atomic<TaskID> task_id_;
std::unordered_map<TaskID, std::unordered_set<TaskID>> dependent_parents_;
std::unordered_map<TaskID, std::unordered_set<TaskID>> dependent_children_;
std::unordered_set<TaskID> finished_task_ids_;
std::unordered_set<TaskID> scheduled_task_ids_;
std::unordered_map<TaskID, TaskArg> task_args_;
std::atomic<int> finished_tasks_;
};
/*
* TaskSystemParallelThreadPoolSleeping: This class is the student's
* optimized implementation of a parallel task execution engine that uses
* a thread pool. See definition of ITaskSystem in
* itasksys.h for documentation of the ITaskSystem interface.
*/
class TaskSystemParallelThreadPoolSleeping: public ITaskSystem {
public:
TaskSystemParallelThreadPoolSleeping(int num_threads);
~TaskSystemParallelThreadPoolSleeping();
const char* name();
void run(IRunnable* runnable, int num_total_tasks);
TaskID runAsyncWithDeps(IRunnable* runnable, int num_total_tasks,
const std::vector<TaskID>& deps);
void sync();
private:
void threadTask( );
bool FinishOneSubTask(TaskID task_id, std::vector<TaskID>& children_task) ;
void ScheduleTaskToReadyQueue(TaskID task_id);
std::vector<std::thread> workers_;
int num_threads_;
bool done_;
std::atomic<bool> wait_sync_;
std::mutex mutex_;
std::condition_variable cv_;
std::deque<threadArg> ready_queue_;
std::unordered_map<TaskID, int> taskid_remain_work_count_map_;
std::unordered_set<TaskID> scheduled_task_ids_;
TaskDep task_dep_;
};
Test Output:
Run time is reduced a lot compared to previsou frequent wake up call.
But it still does not have similar run time in some case compared to reference implemntation.
Executing test: super_super_light_async...
Reference binary: ./runtasks_ref_linux
Results for: super_super_light_async
STUDENT REFERENCE PERF?
[Serial] 8.756 12.191 0.72 (OK)
[Parallel + Always Spawn] 8.429 55.629 0.15 (OK)
[Parallel + Thread Pool + Spin] 8.556 26.492 0.32 (OK)
[Parallel + Thread Pool + Sleep] 30.866 15.692 1.97 (NOT OK)
================================================================================
Executing test: super_light_async...
Reference binary: ./runtasks_ref_linux
Results for: super_light_async
STUDENT REFERENCE PERF?
[Serial] 81.03 95.452 0.85 (OK)
[Parallel + Always Spawn] 79.448 80.543 0.99 (OK)
[Parallel + Thread Pool + Spin] 80.317 30.917 2.60 (NOT OK)
[Parallel + Thread Pool + Sleep] 37.437 28.054 1.33 (NOT OK)
================================================================================
Executing test: ping_pong_equal_async...
Reference binary: ./runtasks_ref_linux
Results for: ping_pong_equal_async
STUDENT REFERENCE PERF?
[Serial] 1294.45 1544.952 0.84 (OK)
[Parallel + Always Spawn] 1296.49 662.509 1.96 (NOT OK)
[Parallel + Thread Pool + Spin] 1293.489 226.582 5.71 (NOT OK)
[Parallel + Thread Pool + Sleep] 193.945 224.466 0.86 (OK)
================================================================================
Executing test: ping_pong_unequal_async...
Reference binary: ./runtasks_ref_linux
Results for: ping_pong_unequal_async
STUDENT REFERENCE PERF?
[Serial] 2429.01 2399.749 1.01 (OK)
[Parallel + Always Spawn] 2407.208 1001.25 2.40 (NOT OK)
[Parallel + Thread Pool + Spin] 2417.762 334.029 7.24 (NOT OK)
[Parallel + Thread Pool + Sleep] 344.299 328.969 1.05 (OK)
================================================================================
Executing test: recursive_fibonacci_async...
Reference binary: ./runtasks_ref_linux
Results for: recursive_fibonacci_async
STUDENT REFERENCE PERF?
[Serial] 1581.684 1939.945 0.82 (OK)
[Parallel + Always Spawn] 1585.249 556.636 2.85 (NOT OK)
[Parallel + Thread Pool + Spin] 1584.694 258.648 6.13 (NOT OK)
[Parallel + Thread Pool + Sleep] 220.133 254.85 0.86 (OK)
================================================================================
Executing test: math_operations_in_tight_for_loop_async...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_async
STUDENT REFERENCE PERF?
[Serial] 808.41 856.84 0.94 (OK)
[Parallel + Always Spawn] 811.738 693.496 1.17 (OK)
[Parallel + Thread Pool + Spin] 808.998 145.345 5.57 (NOT OK)
[Parallel + Thread Pool + Sleep] 152.518 142.003 1.07 (OK)
================================================================================
Executing test: math_operations_in_tight_for_loop_fewer_tasks_async...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_fewer_tasks_async
STUDENT REFERENCE PERF?
[Serial] 812.784 858.997 0.95 (OK)
[Parallel + Always Spawn] 812.874 811.4 1.00 (OK)
[Parallel + Thread Pool + Spin] 810.69 126.379 6.41 (NOT OK)
[Parallel + Thread Pool + Sleep] 122.005 126.256 0.97 (OK)
================================================================================
Executing test: math_operations_in_tight_for_loop_fan_in_async...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_fan_in_async
STUDENT REFERENCE PERF?
[Serial] 417.681 435.005 0.96 (OK)
[Parallel + Always Spawn] 413.541 244.769 1.69 (NOT OK)
[Parallel + Thread Pool + Spin] 413.021 75.503 5.47 (NOT OK)
[Parallel + Thread Pool + Sleep] 73.391 65.41 1.12 (OK)
================================================================================
Executing test: math_operations_in_tight_for_loop_reduction_tree_async...
Reference binary: ./runtasks_ref_linux
Results for: math_operations_in_tight_for_loop_reduction_tree_async
STUDENT REFERENCE PERF?
[Serial] 413.848 438.395 0.94 (OK)
[Parallel + Always Spawn] 414.925 195.032 2.13 (NOT OK)
[Parallel + Thread Pool + Spin] 414.513 72.565 5.71 (NOT OK)
[Parallel + Thread Pool + Sleep] 59.153 65.233 0.91 (OK)
================================================================================
So we can get rid of checking each tasks every time to see if it can be scheduled and put to ready queue.
My code does not has comparable result compared to ref for super_light_async and super_super_light_async .
I think this is because there is still opt space to reduce wake up frequency..
I decide to move on with assign3 of cs149 and just leave the current code as it is.
Just don’t want to spend time on this and want to spend time on learning some cuda writing.
Side knowkedge: buffering in python command
Prompt ** why I don’t see any output from this command but when I remove | tee ./test.out there is output
python3 ../tests/run_test_harness.py -n 8 -a 2>&1 | tee ./test.out **Response The issue you’re encountering is likely due to buffering. When you use tee
to redirect output to a file, the output may be buffered, causing it not to appear immediately on the screen.
Explanation:
- Without
tee
: The output is directly sent to the terminal, and you see it immediately. - With
tee
: The output is first written to a buffer, which is then written to the file and the terminal. This buffering can delay the appearance of the output.
Solution:
You can use the -u
option with python3
to force the standard streams to be unbuffered. This will ensure that the output appears immediately.
Revised command:
python3 -u ../tests/run_test_harness.py -n 8 -a 2>&1 | tee ./test.out
This should resolve the issue and allow you to see the output in real-time while still writing it to test.out
.
Enjoy Reading This Article?
Here are some more articles you might like to read next: