Hybrid Query Processing Engine for Coprocessing in Database Systems
HyPE
|
00001 //default includes 00002 #include <limits> 00003 #include <functional> 00004 #include <config/global_definitions.hpp> 00005 #include <core/operation.hpp> 00006 #include <plugins/optimization_criterias/probability_based_outsourcing.hpp> 00007 #include <query_processing/processing_device.hpp> 00008 00009 00010 #ifdef DUMP_ESTIMATIONS 00011 #include <fstream> 00012 #endif 00013 00014 //#define TIMESTAMP_BASED_LOAD_ADAPTION 00015 //#define PROBABILITY_BASED_LOAD_ADAPTION 00016 //#define LOAD_MODIFICATOR_BASED_LOAD_ADAPTION 00017 00018 using namespace std; 00019 00020 namespace hype{ 00021 namespace core{ 00022 00023 00024 00025 ProbabilityBasedOutsourcing::ProbabilityBasedOutsourcing(const std::string& name_of_operation) : OptimizationCriterion_Internal(name_of_operation,std::string("ProbabilityBasedOutsourcing")), random_number_generator_() { 00026 //OptimizationCriterionFactorySingleton::Instance().Register("Response Time",&ProbabilityBasedOutsourcing::create); 00027 } 00028 00029 const SchedulingDecision ProbabilityBasedOutsourcing::getOptimalAlgorithm_internal(const Tuple& input_values, Operation& op, DeviceTypeConstraint dev_constr){ 00030 //idea: compute a probability for each algorithm depending on its estimated execution time 00031 00032 assert(dev_constr==hype::ANY_DEVICE); 00033 00034 std::vector<AlgorithmPtr> alg_ptrs = op.getAlgorithms(); 00035 00036 std::vector<double> estimated_execution_times(alg_ptrs.size()); 00037 std::vector<double> probabilities(alg_ptrs.size()); 00038 00039 std::map<DeviceSpecification,double> map_compute_device_to_estimated_waiting_time; 00040 00041 for(unsigned int i=0;i<alg_ptrs.size();i++){ 00042 estimated_execution_times[i]=alg_ptrs[i]->getEstimatedExecutionTime(input_values).getTimeinNanoseconds(); 00043 } 00044 00045 double est_Execution_Time_SUM=std::accumulate(estimated_execution_times.begin(),estimated_execution_times.end(),double(0)); 00046 00047 for(unsigned int i=0;i<alg_ptrs.size();i++){ 00048 probabilities[i]=estimated_execution_times[i]/est_Execution_Time_SUM; 00049 } 00050 00051 //if(!quiet && verbose && debug) 00052 { 00053 cout << "Probabilities: " << endl; 00054 for(unsigned int i=0;i<alg_ptrs.size();i++){ 00055 cout << alg_ptrs[i]->getName() << ":\t" << probabilities[i] << endl; 00056 } 00057 } 00058 00059 boost::random::discrete_distribution<> dist(probabilities.begin(),probabilities.end()); 00060 00061 int selected_algorithm_index = dist(random_number_generator_); 00062 00063 AlgorithmPtr optimal_algorithm_ptr = alg_ptrs[selected_algorithm_index]; 00064 00065 assert(optimal_algorithm_ptr!=NULL); 00066 00067 if(!quiet && verbose) 00068 cout << "Choosing " << optimal_algorithm_ptr->getName() << " for operation " << op.getName() << endl; 00069 return SchedulingDecision(*optimal_algorithm_ptr,EstimatedTime(estimated_execution_times[selected_algorithm_index]),input_values); 00070 00071 /* 00072 map_compute_device_to_estimated_waiting_time.insert( 00073 std::make_pair<stemod::ComputeDevice,double>(stemod::CPU,stemod::queryprocessing::getProcessingDevice(stemod::CPU).getEstimatedTimeUntilIdle()) 00074 ); 00075 00076 map_compute_device_to_estimated_waiting_time.insert( 00077 std::make_pair<stemod::ComputeDevice,double>(stemod::GPU,stemod::queryprocessing::getProcessingDevice(stemod::GPU).getEstimatedTimeUntilIdle()) 00078 ); 00079 00080 for(unsigned int i=0;i<estimated_finishing_times_.size();++i){ 00081 //get estimated waiting time, until the operator can start execution 00082 estimated_finishing_times_[i]=map_compute_device_to_estimated_waiting_time[alg_ptrs[i]->getComputeDevice()]; 00083 //debug output 00084 if(!quiet && verbose) 00085 cout << "estimated_finishing_time of Algorithm " << alg_ptrs[i]->getName() << ": " << estimated_finishing_times_[i] << "ns" << endl; 00086 //add the estiamted time, the operator itself needs for execution 00087 estimated_finishing_times_[i]+=alg_ptrs[i]->getEstimatedExecutionTime(input_values).getTimeinNanoseconds(); 00088 //debug output 00089 if(!quiet && verbose) 00090 cout << "estimated_finishing_time of Algorithm " <<alg_ptrs[i]->getName() << " (including Algorithm execution time): " << estimated_finishing_times_[i] << "ns" << endl; 00091 } 00092 00093 AlgorithmPtr optimal_algorithm_ptr; 00094 double min_time=std::numeric_limits<double>::max(); 00095 for(unsigned int i=0;i<estimated_finishing_times_.size();++i){ 00096 if(min_time>estimated_finishing_times_[i]){ 00097 min_time=estimated_finishing_times_[i]; 00098 optimal_algorithm_ptr=alg_ptrs[i]; 00099 } 00100 } 00101 assert(optimal_algorithm_ptr!=NULL); 00102 00103 if(!quiet && verbose) 00104 cout << "Choosing " << optimal_algorithm_ptr->getName() << " for operation " << op.getName() << endl; 00105 return SchedulingDecision(*optimal_algorithm_ptr,optimal_algorithm_ptr->getEstimatedExecutionTime(input_values),input_values); 00106 */ 00107 00108 } 00109 00110 }; //end namespace core 00111 }; //end namespace hype 00112 00113