Hybrid Query Processing Engine for Coprocessing in Database Systems
HyPE
|
00001 //default includes 00002 #include <limits> 00003 00004 #include <config/global_definitions.hpp> 00005 00006 #include <core/operation.hpp> 00007 #include <core/scheduler.hpp> 00008 00009 #include <plugins/optimization_criterias/response_time_advanced.hpp> 00010 00011 #include <query_processing/processing_device.hpp> 00012 00013 #ifdef DUMP_ESTIMATIONS 00014 #include <fstream> 00015 #endif 00016 00017 //#define TIMESTAMP_BASED_LOAD_ADAPTION 00018 //#define PROBABILITY_BASED_LOAD_ADAPTION 00019 //#define LOAD_MODIFICATOR_BASED_LOAD_ADAPTION 00020 00021 using namespace std; 00022 00023 namespace hype{ 00024 namespace core{ 00025 00026 WaitingTimeAwareResponseTime::WaitingTimeAwareResponseTime(const std::string& name_of_operation) : OptimizationCriterion_Internal(name_of_operation,std::string("Response Time")) { 00027 //OptimizationCriterionFactorySingleton::Instance().Register("Response Time",&WaitingTimeAwareResponseTime::create); 00028 } 00029 00030 00031 const SchedulingDecision WaitingTimeAwareResponseTime::getOptimalAlgorithm_internal(const Tuple& input_values, Operation& op, DeviceTypeConstraint dev_constr){ 00032 //idea: choose algorithm with minimal finishing time (including expected waiting time on devices!) 00033 00034 assert(dev_constr==hype::ANY_DEVICE); 00035 00036 std::vector<AlgorithmPtr> alg_ptrs = op.getAlgorithms(); 00037 00038 core::Scheduler& scheduler = core::Scheduler::instance(); 00039 00040 const core::Scheduler::ProcessingDevices::Devices& processing_devices = scheduler.getProcessingDevices().getDevices(); 00041 00042 //compute, how long all processing devices need to finish their jobs 00043 std::vector<double> estimated_finishing_times_pro_dev(processing_devices.size()); //finishing times for processing devices 00044 core::Scheduler::ProcessingDevices::Devices::const_iterator cit; 00045 00046 scheduler.getProcessingDevices().print(); 00047 00048 for(cit=processing_devices.begin();cit!=processing_devices.end();++cit){ 00049 //check wheter the ProcessingDevice ID fits in the array (we assume the device ids have to be defined consecutively) 00050 assert(cit->first<estimated_finishing_times_pro_dev.size()); 00051 estimated_finishing_times_pro_dev[cit->first]=cit->second->getEstimatedFinishingTime(); 00052 } 00053 cout << "Estimated Times per Processing Device: " << endl; 00054 for(unsigned int i=0;i<estimated_finishing_times_pro_dev.size();++i){ 00055 cout << "PD ID: " << i << " Estimated Finishing Time: " << estimated_finishing_times_pro_dev[i] << endl; 00056 } 00057 00058 //compute how long algorithms would take to terminate 00059 std::vector<double> estimated_finishing_times_of_algorithms(alg_ptrs.size()); 00060 00061 for(unsigned int i=0;i<estimated_finishing_times_of_algorithms.size();++i){ 00062 estimated_finishing_times_of_algorithms[i] = alg_ptrs[i]->getEstimatedExecutionTime(input_values).getTimeinNanoseconds(); 00063 unsigned int array_index = alg_ptrs[i]->getDeviceSpecification().getProcessingDeviceID(); 00064 estimated_finishing_times_of_algorithms[i] += estimated_finishing_times_pro_dev[array_index]; 00065 } 00066 00067 cout << "Estimated Times per Algorithm: " << endl; 00068 for(unsigned int i=0;i<estimated_finishing_times_of_algorithms.size();++i){ 00069 cout << "PD ID: " << i << " Estimated Finishing Time: " << estimated_finishing_times_of_algorithms[0] << endl; 00070 } 00071 00072 AlgorithmPtr optimal_algorithm_ptr; 00073 double min_time=std::numeric_limits<double>::max(); 00074 for(unsigned int i=0;i<estimated_finishing_times_of_algorithms.size();++i){ 00075 if(min_time>estimated_finishing_times_of_algorithms[i]){ 00076 min_time=estimated_finishing_times_of_algorithms[i]; 00077 optimal_algorithm_ptr=alg_ptrs[i]; 00078 } 00079 } 00080 assert(optimal_algorithm_ptr!=NULL); 00081 return SchedulingDecision(*optimal_algorithm_ptr,optimal_algorithm_ptr->getEstimatedExecutionTime(input_values),input_values); 00082 00083 } 00084 00085 /* 00086 const SchedulingDecision WaitingTimeAwareResponseTime::getOptimalAlgorithm_internal(const Tuple& input_values, Operation& op, DeviceTypeConstraint dev_constr){ 00087 //idea: choose algorithm with minimal finishing time (including expected waiting time on device!) 00088 00089 assert(dev_constr==hype::ANY_DEVICE); 00090 00091 std::vector<AlgorithmPtr> alg_ptrs = op.getAlgorithms(); 00092 00093 std::vector<double> estimated_finishing_times_(alg_ptrs.size()); 00094 00095 std::map<hype::DeviceSpecification,double> map_compute_device_to_estimated_waiting_time; 00096 00097 map_compute_device_to_estimated_waiting_time.insert( 00098 std::make_pair<hype::DeviceSpecification,double>(hype::CPU,hype::queryprocessing::getProcessingDevice(hype::CPU).getEstimatedTimeUntilIdle()) 00099 ); 00100 00101 map_compute_device_to_estimated_waiting_time.insert( 00102 std::make_pair<hype::DeviceSpecification,double>(hype::GPU,hype::queryprocessing::getProcessingDevice(hype::GPU).getEstimatedTimeUntilIdle()) 00103 ); 00104 00105 for(unsigned int i=0;i<estimated_finishing_times_.size();++i){ 00106 //get estimated waiting time, until the operator can start execution 00107 estimated_finishing_times_[i]=map_compute_device_to_estimated_waiting_time[alg_ptrs[i]->getComputeDevice()]; 00108 //debug output 00109 if(!quiet && verbose) 00110 cout << "estimated_finishing_time of Algorithm " << alg_ptrs[i]->getName() << ": " << estimated_finishing_times_[i] << "ns" << endl; 00111 //add the estiamted time, the operator itself needs for execution 00112 estimated_finishing_times_[i]+=alg_ptrs[i]->getEstimatedExecutionTime(input_values).getTimeinNanoseconds(); 00113 //debug output 00114 if(!quiet && verbose) 00115 cout << "estimated_finishing_time of Algorithm " <<alg_ptrs[i]->getName() << " (including Algorithm execution time): " << estimated_finishing_times_[i] << "ns" << endl; 00116 } 00117 00118 AlgorithmPtr optimal_algorithm_ptr; 00119 double min_time=std::numeric_limits<double>::max(); 00120 for(unsigned int i=0;i<estimated_finishing_times_.size();++i){ 00121 if(min_time>estimated_finishing_times_[i]){ 00122 min_time=estimated_finishing_times_[i]; 00123 optimal_algorithm_ptr=alg_ptrs[i]; 00124 } 00125 } 00126 assert(optimal_algorithm_ptr!=NULL); 00127 00128 if(!quiet && verbose) 00129 cout << "Choosing " << optimal_algorithm_ptr->getName() << " for operation " << op.getName() << endl; 00130 return SchedulingDecision(*optimal_algorithm_ptr,optimal_algorithm_ptr->getEstimatedExecutionTime(input_values),input_values); 00131 00132 } 00133 */ 00134 00135 }; //end namespace core 00136 }; //end namespace hype 00137 00138