Hybrid Query Processing Engine for Coprocessing in Database Systems
HyPE
|
00001 //default includes 00002 #include <limits> 00003 00004 #include <config/global_definitions.hpp> 00005 #include <config/configuration.hpp> 00006 00007 #include <core/operation.hpp> 00008 00009 #include <plugins/optimization_criterias/throughput.hpp> 00010 00011 00012 //#define TIMESTAMP_BASED_LOAD_ADAPTION 00013 //#define PROBABILITY_BASED_LOAD_ADAPTION 00014 //#define LOAD_MODIFICATOR_BASED_LOAD_ADAPTION 00015 00016 using namespace std; 00017 00018 namespace hype 00019 { 00020 namespace core 00021 { 00022 00023 00024 00025 Throughput::Throughput(std::string name_of_operation) : OptimizationCriterion_Internal(name_of_operation,std::string("Response Time")) 00026 { 00027 //OptimizationCriterionFactorySingleton::Instance().Register("Response Time",&Throughput::create); 00028 } 00029 00030 const SchedulingDecision Throughput::getOptimalAlgorithm_internal(const Tuple& input_values, Operation& op, DeviceTypeConstraint dev_constr) 00031 { 00032 assert(dev_constr==hype::ANY_DEVICE); 00033 //double maximal_time_where_algorithm_was_not_choosen=2; //5;//2; //Configuration::maximal_time_where_algorithm_was_not_choosen; 00034 00039 std::vector<AlgorithmPtr> alg_ptrs = op.getAlgorithms(); 00040 00041 std::map<double,std::string> map_execution_times_to_algorithm_name = op.getEstimatedExecutionTimesforAlgorithms(input_values); 00042 if(map_execution_times_to_algorithm_name.empty()) { 00043 std::cout << "FATAL ERROR! no algorithm to choose from!!!" << std::endl; 00044 std::cout << "File: " << __FILE__ << " Line: " << __LINE__ << std::endl; 00045 exit(-1); 00046 } 00047 std::map<double,std::string>::iterator it; 00048 double min_time=std::numeric_limits<double>::max(); 00049 std::string fastest_algorithm_name; 00050 for(it=map_execution_times_to_algorithm_name.begin(); it!=map_execution_times_to_algorithm_name.end(); it++) { 00051 if(!quiet && verbose) cout << "Algorithm: '" << it->second << "' Estimated Execution Time: " << it->first << endl; 00052 if(it->first<min_time) { 00053 min_time=it->first; 00054 fastest_algorithm_name=it->second; 00055 } 00056 } 00057 00058 AlgorithmPtr optimal_algorithm_ptr; 00059 for(unsigned int i=0; i<alg_ptrs.size(); i++) { 00060 if(alg_ptrs[i]->getName()==fastest_algorithm_name) { 00061 optimal_algorithm_ptr=alg_ptrs[i]; 00062 } 00063 } 00064 assert(optimal_algorithm_ptr!=NULL); 00065 00066 for(unsigned int i=0; i<alg_ptrs.size(); i++) { 00067 if(alg_ptrs[i]->getName()==fastest_algorithm_name) continue; 00068 if(!quiet && verbose && debug) 00069 cout << "[DEBUG] stemod::core::Throughput::getOptimalAlgorithm_internal(): " << alg_ptrs[i]->getName() << ": " << 00070 alg_ptrs[i]->getTimeOfLastExecution() << "+" << Runtime_Configuration::instance().getAlgorithmMaximalIdleTime() 00071 << "<" << op.getCurrentTimestamp() << endl; 00072 00073 //FEATURE: Timestamp based load adaption (triggers retraining) %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00074 //if algorithm was not executed for a long time (Configuration::maximal_time_where_algorithm_was_not_choosen times), retrain algorithm 00075 //if(alg_ptrs[i]->getTimeOfLastExecution() +maximal_time_where_algorithm_was_not_choosen<op.getCurrentTimestamp()){ 00076 if(alg_ptrs[i]->getTimeOfLastExecution()+Runtime_Configuration::instance().getAlgorithmMaximalIdleTime()<op.getCurrentTimestamp()) { 00077 if(!quiet) cout << "Operation execution number: " << op.getCurrentTimestamp() << endl; 00078 double estimation=std::max(double(0),alg_ptrs[i]->getEstimatedExecutionTime(input_values).getTimeinNanoseconds()); 00079 double percenaged_slowdown=(estimation-min_time)/min_time; 00080 if(!quiet && verbose && debug) 00081 cout << "[DEBUG] stemod::core::Throughput::getOptimalAlgorithm_internal(): estimation: " << estimation << " minimal time: "<< min_time << " with slowdown: " << percenaged_slowdown << endl; 00082 assert(!alg_ptrs[i]->inTrainingPhase()); 00083 assert(estimation>=min_time); 00084 //assert(percenaged_slowdown>=0); 00085 if(!quiet && verbose && debug) 00086 cout << "[DEBUG] stemod::core::Throughput::getOptimalAlgorithm_internal(): max percentaged slowdown: " << Runtime_Configuration::instance().getMaximalSlowdownOfNonOptimalAlgorithm() << endl; 00087 //cout << "Condition met 1: " << bool(percenaged_slowdown<Runtime_Configuration::instance().getMaximalSlowdownOfNonOptimalAlgorithm()) << endl; 00088 //cout << "Condition met 2: " << bool(percenaged_slowdown>(-1*double(Runtime_Configuration::instance().getMaximalSlowdownOfNonOptimalAlgorithm()))) 00089 //<< " " << double(-1*double(Runtime_Configuration::instance().getMaximalSlowdownOfNonOptimalAlgorithm())) << endl; 00090 if(percenaged_slowdown<Runtime_Configuration::instance().getMaximalSlowdownOfNonOptimalAlgorithm() 00091 && percenaged_slowdown>-1*double(Runtime_Configuration::instance().getMaximalSlowdownOfNonOptimalAlgorithm())) { 00092 //if(!quiet) 00093 //if(!quiet) 00094 if(!quiet && verbose && debug) 00095 cout << "choose not optimal Algorithm: " << alg_ptrs[i]->getName() << " with slowdown: " << percenaged_slowdown << endl; 00096 //update timestamp of last execution, assign current operation timestamp to algorithm and increment execution counter of operation (ONLY in an optimization criterion!) 00097 alg_ptrs[i]->setTimeOfLastExecution(op.getNextTimestamp()); 00098 //return Scheduling Decision 00099 return SchedulingDecision(*alg_ptrs[i],EstimatedTime(alg_ptrs[i]->getEstimatedExecutionTime(input_values)),input_values); 00100 //alg_ptrs[i]->retrain(); 00101 } 00102 } 00103 00105 if(alg_ptrs[i]->inRetrainingPhase()) { 00106 return SchedulingDecision(*alg_ptrs[i],EstimatedTime(alg_ptrs[i]->getEstimatedExecutionTime(input_values)),input_values); 00107 }//*/ 00108 } 00109 00110 std::string name_of_algorithm = map_execution_times_to_algorithm_name[min_time]; 00111 if(!quiet && verbose) cout << "Choosing " << name_of_algorithm << " for operation " << op.getName() << endl; 00112 /*update timestamp of last execution, assign current operation timestamp to algorithm 00113 and increment execution counter of operation (ONLY in an optimization criterion!)*/ 00114 optimal_algorithm_ptr->setTimeOfLastExecution(op.getNextTimestamp()); 00115 return SchedulingDecision(*optimal_algorithm_ptr,EstimatedTime(min_time),input_values); 00116 } 00117 00118 }; //end namespace core 00119 }; //end namespace hype