Hybrid Query Processing Engine for Coprocessing in Database Systems
HyPE
|
00001 #include <iostream> 00002 #include <string> 00003 #include <vector> 00004 #include <memory> 00005 #include <core/optimization_criterion.hpp> 00006 #include <core/time_measurement.hpp> 00007 #include <core/operation.hpp> 00008 #include <core/scheduler.hpp> 00009 00010 #include <query_processing/processing_device.hpp> 00011 #include <query_processing/virtual_processing_device.hpp> 00012 #include <config/configuration.hpp> 00013 00014 using namespace std; 00015 namespace hype 00016 { 00017 namespace core 00018 { 00019 const SchedulingDecision hype::core::OptimizationCriterion_Internal::getOptimalAlgorithm(const Tuple& input_values, Operation& op, DeviceTypeConstraint dev_constr) 00020 { 00021 //skip training mechanism! 00022 if(dev_constr!=hype::ANY_DEVICE){ 00023 SchedulingDecision sched_dec = this->getOptimalAlgorithm_internal(input_values, op, dev_constr); 00024 core::Scheduler::instance().getProcessingDevices().addSchedulingDecision(sched_dec); 00025 return sched_dec; 00026 } 00027 00028 std::vector<AlgorithmPtr> alg_ptrs = op.getAlgorithms(); 00029 00030 /* 00031 for(unsigned int i=0; i<alg_ptrs.size(); i++) { 00032 if(alg_ptrs[i]->getNumberOfDecisionsforThisAlgorithm()==Runtime_Configuration::instance().getTrainingLength()){ 00033 while(alg_ptrs[i]->getNumberOfTerminatedExecutions()<alg_ptrs[i]->getNumberOfDecisionsforThisAlgorithm()){ 00034 //wait until trainingphase is over?! 00035 } 00036 } 00037 00038 }*/ 00039 00040 00041 //executed once for round robin training 00042 if(map_algorithmname_to_number_of_executions_.empty()){ 00043 std::vector<AlgorithmPtr> algs = op.getAlgorithms(); 00044 std::vector<AlgorithmPtr>::const_iterator it; 00045 for(it=algs.begin(); it!=algs.end(); ++it) { 00046 map_algorithmname_to_number_of_executions_[(*it)->getName()]=0; //init map 00047 } 00048 } 00049 00050 bool all_algorithms_finished_training=true; 00051 for(unsigned int i=0; i<alg_ptrs.size(); i++) { 00052 //cout << "Algorithm " << alg_ptrs[i]->getName() << "\tTraining: " << alg_ptrs[i]->inTrainingPhase() << " \t Dec: " << alg_ptrs[i]->getNumberOfDecisionsforThisAlgorithm() 00053 //<< " \t Exec: " << alg_ptrs[i]->getNumberOfTerminatedExecutions() << endl; 00054 if(alg_ptrs[i]->inTrainingPhase()) { 00055 all_algorithms_finished_training=false; 00056 } 00057 } 00058 //cout << "all algorithms finsihed training: " << all_algorithms_finished_training << endl; 00059 if(!all_algorithms_finished_training) { 00060 std::map<std::string,unsigned int>::iterator it; 00061 std::string alg_with_min_executions; 00062 unsigned int min_execution=std::numeric_limits<unsigned int>::max(); 00063 for(it = map_algorithmname_to_number_of_executions_.begin(); it!= map_algorithmname_to_number_of_executions_.end(); ++it) { 00064 if(it->second<min_execution) { 00065 min_execution=it->second; 00066 alg_with_min_executions=it->first; 00067 } 00068 } 00069 AlgorithmPtr pointer_to_choosen_algorithm=op.getAlgorithm(alg_with_min_executions); 00070 assert(pointer_to_choosen_algorithm!=NULL); 00071 //cout << "Training: Choosing: " << alg_with_min_executions << endl; 00072 map_algorithmname_to_number_of_executions_[alg_with_min_executions]++; 00073 //return SchedulingDecision(*pointer_to_choosen_algorithm,EstimatedTime(-1),input_values); 00074 SchedulingDecision sched_dec(*pointer_to_choosen_algorithm,EstimatedTime(-1),input_values); 00075 core::Scheduler::instance().getProcessingDevices().addSchedulingDecision(sched_dec); 00076 return sched_dec; 00077 } 00078 00079 for(unsigned int i=0; i<alg_ptrs.size(); i++) { 00080 if(!quiet && verbose) 00081 cout << "Algorithm: " << alg_ptrs[i]->getName() << " In Training Phase: " << alg_ptrs[i]->inTrainingPhase() << endl; 00082 #ifdef TIMESTAMP_BASED_LOAD_ADAPTION 00083 //FEATURE: Timestamp based load adaption (triggers retraining) %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00084 //if algorithm was not executed for a long time (Configuration::maximal_time_where_algorithm_was_not_choosen times), retrain algorithm 00085 if(alg_ptrs[i]->getTimeOfLastExecution()+Configuration::maximal_time_where_algorithm_was_not_choosen<op.getCurrentTimestamp()) { 00086 cout << "Operation execution number: " << op.getCurrentTimestamp() << endl; 00087 alg_ptrs[i]->retrain(); 00088 } 00089 #endif 00090 #ifdef LOAD_MODIFICATOR_BASED_LOAD_ADAPTION 00091 //FEATURE: Load Modification factor based load adaption %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00092 if(alg_ptrs[i]->getLoadChangeEstimator().getLoadModificator()>2 00093 || alg_ptrs[i]->getLoadChangeEstimator().getLoadModificator()<0.5f) { //execution times increased by factor of 2 -> sifnificant load change, retrain all algorithms? 00094 cout << "Operation execution number: " << op.getCurrentTimestamp() << "\tAlgorithm: " << alg_ptrs[i]->getName() << "\t" 00095 << "Significant load change confirmed: " << alg_ptrs[i]->getLoadChangeEstimator().getLoadModificator() << endl; 00096 } 00097 #endif 00098 //train algorithms in round robin manner 00099 /*if(alg_ptrs[i]->inTrainingPhase()) { 00100 return SchedulingDecision(alg_ptrs[i]->getName(),EstimatedTime(-1),input_values); 00101 }*/ 00102 00103 if(alg_ptrs[i]->inRetrainingPhase()) { 00104 //return SchedulingDecision(*alg_ptrs[i],EstimatedTime(alg_ptrs[i]->getEstimatedExecutionTime(input_values)),input_values); 00105 SchedulingDecision sched_dec(*alg_ptrs[i],EstimatedTime(alg_ptrs[i]->getEstimatedExecutionTime(input_values)),input_values); 00106 core::Scheduler::instance().getProcessingDevices().addSchedulingDecision(sched_dec); 00107 return sched_dec; 00108 }//*/ 00109 } 00110 00111 SchedulingDecision sched_dec=this->getOptimalAlgorithm_internal(input_values, op, dev_constr); 00112 core::Scheduler::instance().getProcessingDevices().addSchedulingDecision(sched_dec); 00113 return sched_dec; 00114 00115 //return this->getOptimalAlgorithm_internal(input_values, op, dev_constr); 00116 00117 } 00118 const std::string& OptimizationCriterion_Internal::getName() const 00119 { 00120 return name_of_optimization_criterion_; 00121 } 00122 00123 OptimizationCriterion_Internal::OptimizationCriterion_Internal(const std::string& name_of_optimization_criterion, const std::string& name_of_operation) : map_algorithmname_to_number_of_executions_(), name_of_optimization_criterion_(name_of_optimization_criterion), name_of_operation_(name_of_operation) {} 00124 00125 00126 OptimizationCriterion_Internal::~OptimizationCriterion_Internal() {} 00127 00128 std::tr1::shared_ptr<OptimizationCriterion_Internal> getNewOptimizationCriterionbyName(const std::string& name_of_optimization_criterion) 00129 { 00130 OptimizationCriterion_Internal* ptr = OptimizationCriterionFactorySingleton::Instance().CreateObject(name_of_optimization_criterion); //.( 1, createProductNull ); 00131 return std::tr1::shared_ptr<OptimizationCriterion_Internal> (ptr); 00132 } 00133 00134 OptimizationCriterionFactory& OptimizationCriterionFactorySingleton::Instance() 00135 { 00136 static OptimizationCriterionFactory factory; 00137 return factory; 00138 } 00139 }; //end namespace core 00140 }; //end namespace hype