Hybrid Query Processing Engine for Coprocessing in Database Systems
HyPE
processing_device.cpp
Go to the documentation of this file.
00001 
00002 #include <config/configuration.hpp>
00003 
00004 #include <query_processing/processing_device.hpp>
00005 
00006 #include <boost/bind.hpp>
00007 
00008 using namespace std;
00009 
00010 namespace hype{
00011    namespace queryprocessing{
00012 
00013       void Executor(hype::queryprocessing::ProcessingDevice& pd)
00014       {
00015          pd.run();
00016       }
00017 
00018       ProcessingDevice::ProcessingDevice() : operators_(), new_operator_available_(), operator_queue_full_(), operator_mutex_(), thread_(),
00019          start_timestamp_of_currently_executed_operator_(0),
00020          estimated_execution_time_of_currently_executed_operator_(0),
00021          operator_in_execution_(false),
00022          estimated_execution_time_for_operators_in_queue_to_complete_(0),
00023          total_measured_execution_time_for_all_executed_operators_(0)
00024       {
00025 
00026       }
00027 
00028       void ProcessingDevice::start()
00029       {
00030          //thread_=boost::thread(  boost::bind( &ProcessingDevice::start, boost::ref(*this) )  );
00031          thread_=boost::thread(  boost::bind( Executor, boost::ref(*this)  )  );
00032 
00033       }
00034 
00035       void ProcessingDevice::stop()
00036       {
00037          thread_.interrupt();
00038          thread_.join();
00039       }
00040 
00041       bool ProcessingDevice::addOperator(OperatorPtr op)
00042       {  /* \todo this method has to wait, since it is not meaningful to store the whole workload (since we don't have meaningful estiamtions at the beginning of the application)*/
00043          assert(op!=NULL);
00044          //boost::mutex::scoped_lock lock(operator_mutex_);
00045          //boost::lock_guard<boost::mutex> lock(operator_mutex_);
00046          boost::mutex::scoped_lock lock(operator_mutex_);
00047          while(operators_.size()>hype::core::Runtime_Configuration::instance().getMaximalReadyQueueLength()){//10 //100) {
00048             operator_queue_full_.wait(lock);
00049          }
00050          operators_.push_back(op);
00051          estimated_execution_time_for_operators_in_queue_to_complete_+=std::max(op->getSchedulingDecision().getEstimatedExecutionTimeforAlgorithm().getTimeinNanoseconds(),double(0));
00052          if(!hype::core::quiet){
00053          cout << "new waiting time for Algorithm " << op->getSchedulingDecision().getNameofChoosenAlgorithm() << ": " << estimated_execution_time_for_operators_in_queue_to_complete_ << "ns" << endl;
00054          cout << "number of queued operators: " << this->operators_.size() << " for Algorithm " << op->getSchedulingDecision().getNameofChoosenAlgorithm() << endl;
00055          }
00056          new_operator_available_.notify_all();
00057          return true;
00058 
00059 
00060       }
00061 
00062       void ProcessingDevice::run()
00063       {
00064          while(true) {
00065             boost::mutex::scoped_lock lock(operator_mutex_);
00066 
00067             while(operators_.empty()) {
00068                new_operator_available_.wait(lock);
00069             }
00070             if(!hype::core::quiet && hype::core::verbose && hype::core::debug)
00071                cout << "[Workerthread] Found work" << endl;
00072             //cout << "Lock: " << lock.owns_lock() << endl;  
00073             OperatorPtr op = operators_.front();
00074             //remove estimated count from the estimation sum
00075             estimated_execution_time_for_operators_in_queue_to_complete_-=std::max(op->getSchedulingDecision().getEstimatedExecutionTimeforAlgorithm().getTimeinNanoseconds(),double(0));
00076             //store the estiamted execution time for current operator
00077             this->estimated_execution_time_of_currently_executed_operator_=std::max(op->getSchedulingDecision().getEstimatedExecutionTimeforAlgorithm().getTimeinNanoseconds(),double(0));
00078             //store timestamp of starting time for operator
00079             this->start_timestamp_of_currently_executed_operator_=hype::core::getTimestamp();
00080             //set flag that processing device is now busy processing an operator
00081             this->operator_in_execution_=true;
00082             lock.unlock();
00083             uint64_t timestamp_begin = core::getTimestamp();
00084             //execute Operator
00085             (*op)();
00086             uint64_t timestamp_end = core::getTimestamp();
00087             lock.lock();
00088             //update total processing time on this processing device
00089             assert(timestamp_end>timestamp_begin);
00090             total_measured_execution_time_for_all_executed_operators_+=double(timestamp_end-timestamp_begin);
00091             //set flag that processing device is (at the moment) not busy processing an operator
00092             this->operator_in_execution_=false;
00093             //remove first element (that was the operator, which is still queued!)
00094             operators_.pop_front();
00095             //notify that operator has finished and that a new one can be added to the operator queue
00096             operator_queue_full_.notify_all();
00097             //lock.unlock();
00098             try {
00099                if(!hype::core::quiet && hype::core::verbose && hype::core::debug)
00100                   cout << "[Workerthread] Reached interuption point" << endl;
00101                boost::this_thread::interruption_point();
00102 
00103             } catch (boost::thread_interrupted& e) {
00104                if(!hype::core::quiet) cout << "Received Termination Signal" << endl;
00105                return;
00106             }
00107 
00108          }
00109       }
00110 
00111       bool ProcessingDevice::isIdle()
00112       {
00113          //boost::mutex::scoped_lock lock(operator_mutex_);
00114          boost::lock_guard<boost::mutex> lock(operator_mutex_);
00115          //cout << "#operators: " << operators_.size() << endl;
00116          return operators_.empty() && !this->operator_in_execution_;
00117       }
00118 
00119       double hype::queryprocessing::ProcessingDevice::getEstimatedTimeUntilIdle()
00120       {
00121          boost::lock_guard<boost::mutex> lock(operator_mutex_);
00122          return estimated_execution_time_for_operators_in_queue_to_complete_+this->getEstimatedTimeUntilOperatorCompletion();
00123       }
00124 
00125       double hype::queryprocessing::ProcessingDevice::getEstimatedTimeUntilOperatorCompletion()
00126       {
00127          if(operator_in_execution_) {
00128             uint64_t current_timestamp=hype::core::getTimestamp();
00129             uint64_t current_processing_time = current_timestamp-this->start_timestamp_of_currently_executed_operator_;
00130             return std::max( this->estimated_execution_time_of_currently_executed_operator_ - double(current_processing_time), double(0)); //return expected remaining processing time
00131          } else {
00132             return 0;
00133          }
00134 
00135       }
00136 
00137       double hype::queryprocessing::ProcessingDevice::getTotalProcessingTime()
00138       {
00139          boost::lock_guard<boost::mutex> lock(operator_mutex_);
00140          return total_measured_execution_time_for_all_executed_operators_;
00141       }
00142 
00143       ProcessingDevice& getProcessingDevice(const hype::core::DeviceSpecification& dev)
00144       {
00145          static ProcessingDevice cpu;
00146          static ProcessingDevice gpu;
00147 
00148          if(dev.getDeviceType()==hype::CPU){
00149             return cpu;
00150          } else if(dev.getDeviceType()==hype::GPU) {
00151             return gpu;
00152          } else {
00153             //maybe throw illegal argument exception?!
00154             return cpu; //if unkwon, just return cpu
00155          }
00156 
00157       }
00158 
00159    }; //end namespace queryprocessing
00160 }; //end namespace hype
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines