Hybrid Query Processing Engine for Coprocessing in Database Systems
HyPE
|
00001 00002 #include <config/configuration.hpp> 00003 00004 #include <core/algorithm.hpp> 00005 #include <core/operation.hpp> 00006 00007 using namespace std; 00008 00009 namespace hype{ 00010 namespace core{ 00011 00012 Algorithm::Algorithm(const std::string& name_of_algorithm, 00013 const std::string& name_of_statistical_method, 00014 const std::string& name_of_recomputation_strategy, 00015 Operation& operation, 00016 DeviceSpecification comp_dev) : name_(name_of_algorithm), 00017 ptr_statistical_method_(getNewStatisticalMethodbyName(name_of_statistical_method)), 00018 ptr_recomputation_heristic_( 00019 getNewRecomputationHeuristicbyName(name_of_recomputation_strategy) 00020 ), 00021 statistics_(), 00022 operation_(operation), 00023 logical_timestamp_of_last_execution_(0), 00024 is_in_retraining_phase_(false), 00025 retraining_length_(0), 00026 load_change_estimator_(10), 00027 comp_dev_(comp_dev) 00028 { 00029 00030 if(ptr_statistical_method_==NULL || ptr_recomputation_heristic_==NULL) 00031 std::cout << "Fatal Error!" << std::endl; 00032 00033 //getNewStatisticalMethodbyName(name_of_statistical_method); 00034 00035 } 00036 00037 Algorithm::~Algorithm(){ 00038 if(Runtime_Configuration::instance().printAlgorithmStatistics()){ 00039 if(!statistics_.writeToDisc(operation_.getName(),name_)) 00040 cout << "Error! Could not write statistics for algorithm" << name_ << " to disc!" << endl; 00041 } 00042 } 00043 00044 bool Algorithm::setStatisticalMethod(std::tr1::shared_ptr<StatisticalMethod_Internal> ptr_statistical_method){ 00045 if(ptr_statistical_method){ 00046 ptr_statistical_method_.reset(); 00047 ptr_statistical_method_=ptr_statistical_method; 00048 //ptr_statistical_method_.reset(ptr_statistical_method); 00049 return true; 00050 }else{ 00051 cout << "Error! in setStatisticalMethod(): ptr_statistical_method==NULL !" << endl; 00052 return false; 00053 } 00054 } 00055 00056 bool Algorithm::inTrainingPhase() const throw(){ 00057 return ptr_statistical_method_->inTrainingPhase(); 00058 } 00059 00060 bool Algorithm::inRetrainingPhase() const throw(){ 00061 return is_in_retraining_phase_; 00062 } 00063 00064 bool Algorithm::setRecomputationHeuristic(std::tr1::shared_ptr<RecomputationHeuristic_Internal> ptr_recomputation_heristic){ 00065 if(ptr_recomputation_heristic){ 00066 ptr_recomputation_heristic_.reset(); 00067 ptr_recomputation_heristic_=ptr_recomputation_heristic; 00068 //ptr_recomputation_heristic_.reset(ptr_recomputation_heristic); 00069 return true; 00070 }else{ 00071 cout << "Error! in setRecomputationHeuristic(): ptr_recomputation_heristic==NULL !" << endl; 00072 return false; 00073 } 00074 return true; 00075 } 00076 00077 const std::string Algorithm::getName() const{ 00078 return name_; 00079 } 00080 00081 bool Algorithm::addMeasurementPair(const MeasurementPair& mp){ 00082 //logical_timestamp_of_last_execution_=operation_.getNextTimestamp(); //after each execution of algorithm, add new timestamp (logical time form logical clock that is increased with every operation execution! -> Means, that each algorithms increments the timestamp by calling getNextTimestamp()) 00083 if(!this->inTrainingPhase()) 00084 load_change_estimator_.add(mp);//first we have to learn the current load situation properly, afterwards, we can compute meaningful Load modifications 00085 00086 if(ptr_recomputation_heristic_->recompute(*this)){ 00087 if(!ptr_statistical_method_->recomuteApproximationFunction(*this)){ 00088 if(!quiet && verbose) cout << "Error while recomputing approximation function of algorithm '" << name_ << "'" << endl; 00089 }else{ 00090 //success 00091 if(!quiet && verbose) 00092 cout << "Successfully recomputed approximation function of algorithm '" << name_ << "'" << endl; 00093 statistics_.number_of_recomputations_++; //update statistics 00094 } 00095 } 00096 //if an observation was added, the model obviously decided to execute this algorithm 00097 //this->statistics_.number_of_decisions_for_this_algorithm_++; 00098 00099 this->statistics_.number_of_terminated_executions_of_this_algorithm_++; 00100 00101 this->statistics_.total_execution_time_+=mp.getMeasuredTime().getTimeinNanoseconds(); 00102 00103 this->statistics_.relative_errors_.push_back( (mp.getMeasuredTime().getTimeinNanoseconds()-mp.getEstimatedTime().getTimeinNanoseconds()) 00104 /mp.getEstimatedTime().getTimeinNanoseconds() 00105 ); 00106 00107 // (measured_time.getTimeinNanoseconds()-scheduling_decision_.getEstimatedExecutionTimeforAlgorithm().getTimeinNanoseconds())/scheduling_decision_.getEstimatedExecutionTimeforAlgorithm().getTimeinNanoseconds() << "%" << endl; 00108 00109 if(is_in_retraining_phase_){ 00110 //if(Configuration::maximal_retraining_length>=retraining_length_++){ 00111 if(Runtime_Configuration::instance().getRetrainingLength()>=retraining_length_++){ 00112 is_in_retraining_phase_=false; 00113 if(!quiet) cout << "Finished retraining" << endl; 00114 //ptr_statistical_method_->recomuteApproximationFunction(*this); 00115 } 00116 } 00117 00118 return statistics_.executionHistory_.addMeasurementPair(mp); 00119 } 00120 00121 const EstimatedTime Algorithm::getEstimatedExecutionTime(const Tuple& input_values){ 00122 return ptr_statistical_method_->computeEstimation(input_values); 00123 } 00124 00125 unsigned int Algorithm::getNumberOfDecisionsforThisAlgorithm() const throw(){ 00126 return this->statistics_.number_of_decisions_for_this_algorithm_; 00127 } 00128 00129 unsigned int Algorithm::getNumberOfTerminatedExecutions() const throw(){ 00130 return this->statistics_.number_of_terminated_executions_of_this_algorithm_; 00131 } 00132 00134 double Algorithm::getTotalExecutionTime() const throw(){ 00135 return statistics_.total_execution_time_; 00136 } 00137 00138 uint64_t Algorithm::getTimeOfLastExecution() const throw(){ 00139 return logical_timestamp_of_last_execution_; 00140 } 00141 00142 void Algorithm::setTimeOfLastExecution(uint64_t new_timestamp) throw(){ 00143 this->logical_timestamp_of_last_execution_=new_timestamp; 00144 } 00145 00146 void Algorithm::incrementNumberofDecisionsforThisAlgorithm() throw(){ 00147 this->statistics_.number_of_decisions_for_this_algorithm_++; 00148 } 00149 00150 void Algorithm::retrain(){ 00151 if(!quiet) std::cout << "Retrain algorithm " << name_ << std::endl; 00152 is_in_retraining_phase_=true; 00153 retraining_length_=0; 00154 //ptr_statistical_method_->retrain(); 00155 00156 //this->statistics_.executionHistory_.clear(); 00157 } 00158 00159 const LoadChangeEstimator& Algorithm::getLoadChangeEstimator() const throw(){ 00160 return load_change_estimator_; 00161 } 00162 00163 const DeviceSpecification Algorithm::getDeviceSpecification() const throw(){ 00164 return comp_dev_; 00165 } 00166 00167 }; //end namespace core 00168 }; //end namespace hype 00169 00170