Hybrid Query Processing Engine for Coprocessing in Database Systems
HyPE
|
00001 00002 #include <config/configuration.hpp> 00003 00004 #include <query_processing/processing_device.hpp> 00005 00006 #include <boost/bind.hpp> 00007 00008 using namespace std; 00009 00010 namespace hype{ 00011 namespace queryprocessing{ 00012 00013 void Executor(hype::queryprocessing::ProcessingDevice& pd) 00014 { 00015 pd.run(); 00016 } 00017 00018 ProcessingDevice::ProcessingDevice() : operators_(), new_operator_available_(), operator_queue_full_(), operator_mutex_(), thread_(), 00019 start_timestamp_of_currently_executed_operator_(0), 00020 estimated_execution_time_of_currently_executed_operator_(0), 00021 operator_in_execution_(false), 00022 estimated_execution_time_for_operators_in_queue_to_complete_(0), 00023 total_measured_execution_time_for_all_executed_operators_(0) 00024 { 00025 00026 } 00027 00028 void ProcessingDevice::start() 00029 { 00030 //thread_=boost::thread( boost::bind( &ProcessingDevice::start, boost::ref(*this) ) ); 00031 thread_=boost::thread( boost::bind( Executor, boost::ref(*this) ) ); 00032 00033 } 00034 00035 void ProcessingDevice::stop() 00036 { 00037 thread_.interrupt(); 00038 thread_.join(); 00039 } 00040 00041 bool ProcessingDevice::addOperator(OperatorPtr op) 00042 { /* \todo this method has to wait, since it is not meaningful to store the whole workload (since we don't have meaningful estiamtions at the beginning of the application)*/ 00043 assert(op!=NULL); 00044 //boost::mutex::scoped_lock lock(operator_mutex_); 00045 //boost::lock_guard<boost::mutex> lock(operator_mutex_); 00046 boost::mutex::scoped_lock lock(operator_mutex_); 00047 while(operators_.size()>hype::core::Runtime_Configuration::instance().getMaximalReadyQueueLength()){//10 //100) { 00048 operator_queue_full_.wait(lock); 00049 } 00050 operators_.push_back(op); 00051 estimated_execution_time_for_operators_in_queue_to_complete_+=std::max(op->getSchedulingDecision().getEstimatedExecutionTimeforAlgorithm().getTimeinNanoseconds(),double(0)); 00052 if(!hype::core::quiet){ 00053 cout << "new waiting time for Algorithm " << op->getSchedulingDecision().getNameofChoosenAlgorithm() << ": " << estimated_execution_time_for_operators_in_queue_to_complete_ << "ns" << endl; 00054 cout << "number of queued operators: " << this->operators_.size() << " for Algorithm " << op->getSchedulingDecision().getNameofChoosenAlgorithm() << endl; 00055 } 00056 new_operator_available_.notify_all(); 00057 return true; 00058 00059 00060 } 00061 00062 void ProcessingDevice::run() 00063 { 00064 while(true) { 00065 boost::mutex::scoped_lock lock(operator_mutex_); 00066 00067 while(operators_.empty()) { 00068 new_operator_available_.wait(lock); 00069 } 00070 if(!hype::core::quiet && hype::core::verbose && hype::core::debug) 00071 cout << "[Workerthread] Found work" << endl; 00072 //cout << "Lock: " << lock.owns_lock() << endl; 00073 OperatorPtr op = operators_.front(); 00074 //remove estimated count from the estimation sum 00075 estimated_execution_time_for_operators_in_queue_to_complete_-=std::max(op->getSchedulingDecision().getEstimatedExecutionTimeforAlgorithm().getTimeinNanoseconds(),double(0)); 00076 //store the estiamted execution time for current operator 00077 this->estimated_execution_time_of_currently_executed_operator_=std::max(op->getSchedulingDecision().getEstimatedExecutionTimeforAlgorithm().getTimeinNanoseconds(),double(0)); 00078 //store timestamp of starting time for operator 00079 this->start_timestamp_of_currently_executed_operator_=hype::core::getTimestamp(); 00080 //set flag that processing device is now busy processing an operator 00081 this->operator_in_execution_=true; 00082 lock.unlock(); 00083 uint64_t timestamp_begin = core::getTimestamp(); 00084 //execute Operator 00085 (*op)(); 00086 uint64_t timestamp_end = core::getTimestamp(); 00087 lock.lock(); 00088 //update total processing time on this processing device 00089 assert(timestamp_end>timestamp_begin); 00090 total_measured_execution_time_for_all_executed_operators_+=double(timestamp_end-timestamp_begin); 00091 //set flag that processing device is (at the moment) not busy processing an operator 00092 this->operator_in_execution_=false; 00093 //remove first element (that was the operator, which is still queued!) 00094 operators_.pop_front(); 00095 //notify that operator has finished and that a new one can be added to the operator queue 00096 operator_queue_full_.notify_all(); 00097 //lock.unlock(); 00098 try { 00099 if(!hype::core::quiet && hype::core::verbose && hype::core::debug) 00100 cout << "[Workerthread] Reached interuption point" << endl; 00101 boost::this_thread::interruption_point(); 00102 00103 } catch (boost::thread_interrupted& e) { 00104 if(!hype::core::quiet) cout << "Received Termination Signal" << endl; 00105 return; 00106 } 00107 00108 } 00109 } 00110 00111 bool ProcessingDevice::isIdle() 00112 { 00113 //boost::mutex::scoped_lock lock(operator_mutex_); 00114 boost::lock_guard<boost::mutex> lock(operator_mutex_); 00115 //cout << "#operators: " << operators_.size() << endl; 00116 return operators_.empty() && !this->operator_in_execution_; 00117 } 00118 00119 double hype::queryprocessing::ProcessingDevice::getEstimatedTimeUntilIdle() 00120 { 00121 boost::lock_guard<boost::mutex> lock(operator_mutex_); 00122 return estimated_execution_time_for_operators_in_queue_to_complete_+this->getEstimatedTimeUntilOperatorCompletion(); 00123 } 00124 00125 double hype::queryprocessing::ProcessingDevice::getEstimatedTimeUntilOperatorCompletion() 00126 { 00127 if(operator_in_execution_) { 00128 uint64_t current_timestamp=hype::core::getTimestamp(); 00129 uint64_t current_processing_time = current_timestamp-this->start_timestamp_of_currently_executed_operator_; 00130 return std::max( this->estimated_execution_time_of_currently_executed_operator_ - double(current_processing_time), double(0)); //return expected remaining processing time 00131 } else { 00132 return 0; 00133 } 00134 00135 } 00136 00137 double hype::queryprocessing::ProcessingDevice::getTotalProcessingTime() 00138 { 00139 boost::lock_guard<boost::mutex> lock(operator_mutex_); 00140 return total_measured_execution_time_for_all_executed_operators_; 00141 } 00142 00143 ProcessingDevice& getProcessingDevice(const hype::core::DeviceSpecification& dev) 00144 { 00145 static ProcessingDevice cpu; 00146 static ProcessingDevice gpu; 00147 00148 if(dev.getDeviceType()==hype::CPU){ 00149 return cpu; 00150 } else if(dev.getDeviceType()==hype::GPU) { 00151 return gpu; 00152 } else { 00153 //maybe throw illegal argument exception?! 00154 return cpu; //if unkwon, just return cpu 00155 } 00156 00157 } 00158 00159 }; //end namespace queryprocessing 00160 }; //end namespace hype