Hybrid Query Processing Engine for Coprocessing in Database Systems
HyPE
|
00001 00002 #include <core/scheduler.hpp> 00003 #include <plugins/pluginloader.hpp> 00004 #include <stdlib.h> 00005 00006 #include <boost/thread.hpp> 00007 00008 using namespace std; 00009 00010 00011 namespace hype{ 00012 namespace core{ 00013 00014 using namespace queryprocessing; 00015 00016 boost::mutex global_mutex; 00017 00018 Scheduler::Scheduler() : map_operationname_to_operation_(), map_statisticalmethodname_to_statisticalmethod_(),proc_devs_(){ 00019 //is called once, because Scheduler is a Singelton 00020 PluginLoader::loadPlugins(); 00021 } 00022 00023 00024 bool Scheduler::addAlgorithm(const AlgorithmSpecification& alg_spec, const DeviceSpecification& dev_spec){ 00025 boost::lock_guard<boost::mutex> lock(global_mutex); 00026 00027 if(!this->proc_devs_.exists(dev_spec)){ 00028 if(!this->proc_devs_.addDevice(dev_spec)){ 00029 std::cout << "FATAL ERROR! Failed to add Processing Device while creatign Algorithm '" << alg_spec.getAlgorithmName() << "'" << std::endl; 00030 } 00031 } 00032 00033 std::string name_of_operation=alg_spec.getOperationName(); 00034 00035 std::map<std::string,std::tr1::shared_ptr<Operation> >::iterator it; 00036 00037 it=map_operationname_to_operation_.find(name_of_operation); 00038 00039 std::tr1::shared_ptr<Operation> op; 00040 00041 if(it==map_operationname_to_operation_.end()){ //operation does not exist 00042 op = std::tr1::shared_ptr<Operation>(new Operation(name_of_operation)); 00043 map_operationname_to_operation_[name_of_operation]=op; 00044 }else{ 00045 op=it->second; //std::tr1::shared_ptr<Operation> (it->second); 00046 } 00047 00048 return op->addAlgorithm(alg_spec.getAlgorithmName(), 00049 dev_spec, 00050 alg_spec.getStatisticalMethodName(), 00051 alg_spec.getRecomputationHeuristicName()); 00052 } 00053 00054 Scheduler& Scheduler::instance(){ 00055 static Scheduler scheduler; 00056 return scheduler; 00057 } 00058 /* 00059 bool Scheduler::addAlgorithm(const std::string& name_of_operation, 00060 const std::string& name_of_algorithm, 00061 DeviceSpecification comp_dev, 00062 const std::string& name_of_statistical_method, 00063 const std::string& name_of_recomputation_strategy){ 00064 00065 boost::lock_guard<boost::mutex> lock(global_mutex); 00066 00067 std::map<std::string,std::tr1::shared_ptr<Operation> >::iterator it; 00068 00069 it=map_operationname_to_operation_.find(name_of_operation); 00070 00071 std::tr1::shared_ptr<Operation> op; 00072 00073 if(it==map_operationname_to_operation_.end()){ //operation does not exist 00074 op = std::tr1::shared_ptr<Operation>(new Operation(name_of_operation)); 00075 map_operationname_to_operation_[name_of_operation]=op; 00076 }else{ 00077 op=it->second; //std::tr1::shared_ptr<Operation> (it->second); 00078 } 00079 00080 return op->addAlgorithm(name_of_algorithm, 00081 comp_dev, 00082 name_of_statistical_method, 00083 name_of_recomputation_strategy); 00084 00085 00086 //return true; 00087 }*/ 00088 00089 bool Scheduler::setOptimizationCriterion(const std::string& name_of_operation, 00090 const std::string& name_of_optimization_criterion){ 00091 00092 boost::lock_guard<boost::mutex> lock(global_mutex); 00093 00094 // map_operationname_to_operation_.find 00095 // StatisticalMethodMap::iterator it = map_statisticalmethodname_to_statisticalmethod_.find(name_of_operation); 00096 00097 std::tr1::shared_ptr<OptimizationCriterion_Internal> opt_crit = getNewOptimizationCriterionbyName(name_of_optimization_criterion); 00098 if(opt_crit){ 00099 //StatisticalMethodMap::iterator it = map_statisticalmethodname_to_statisticalmethod_.find(name_of_operation); 00100 00101 MapNameToOperation::iterator it = map_operationname_to_operation_.find(name_of_operation); 00102 if(it==map_operationname_to_operation_.end()){ 00103 cout << "Operation not found! " << name_of_operation << endl; 00104 return false; 00105 } 00106 return (*it).second->setNewOptimizationCriterion(name_of_optimization_criterion); 00107 //map_operationname_to_operation_[name_of_operation]=opt_crit; 00108 //return true; 00109 } 00110 return false; 00111 } 00112 00113 /* not part of intrface!!! -> no thread safety implemented!!!*/ 00114 const AlgorithmPtr Scheduler::getAlgorithm(const std::string& name_of_algorithm){ 00115 00116 //boost::lock_guard<boost::mutex> lock(global_mutex); 00117 00118 MapNameToOperation::iterator it; 00119 for(it= map_operationname_to_operation_.begin(); it !=map_operationname_to_operation_.end(); it++){ 00120 if(it->second->hasAlgorithm(name_of_algorithm)) 00121 return it->second->getAlgorithm(name_of_algorithm); 00122 } 00123 return AlgorithmPtr(); //NULL Pointer if algorithm is not found 00124 } 00125 00126 00127 bool Scheduler::setStatisticalMethod(const std::string& name_of_algorithm, 00128 const std::string& name_of_statistical_method){ 00129 00130 boost::lock_guard<boost::mutex> lock(global_mutex); 00131 00132 AlgorithmPtr alg_ptr = this->getAlgorithm(name_of_algorithm); 00133 if(!alg_ptr){ 00134 cout << "Error in Scheduler::setStatisticalMethod(): Algorithm " << name_of_algorithm << " not found!" << endl; 00135 return false; 00136 } 00137 cout << "Found Algorithm: " << alg_ptr->getName() << endl; 00138 StatisticalMethodPtr stat_meth_ptr = getNewStatisticalMethodbyName(name_of_statistical_method); 00139 if(!stat_meth_ptr) return false; 00140 cout << "created statistical method: " << name_of_statistical_method << endl; 00141 return alg_ptr->setStatisticalMethod(stat_meth_ptr); 00142 } 00143 00144 bool Scheduler::setRecomputationHeuristic(const std::string& name_of_algorithm, 00145 const std::string& name_of_recomputation_strategy){ 00146 00147 boost::lock_guard<boost::mutex> lock(global_mutex); 00148 00149 AlgorithmPtr alg_ptr = this->getAlgorithm(name_of_algorithm); 00150 if(!alg_ptr){ 00151 cout << "Error in Scheduler::setRecomputationHeuristic(): Algorithm " << name_of_algorithm << " not found!" << endl; 00152 return false; 00153 } 00154 std::tr1::shared_ptr<RecomputationHeuristic_Internal> recomp_heuristic = getNewRecomputationHeuristicbyName(name_of_recomputation_strategy); 00155 if(!recomp_heuristic) return false; 00156 return alg_ptr->setRecomputationHeuristic(recomp_heuristic); 00157 } 00158 00159 const SchedulingDecision Scheduler::getOptimalAlgorithm(const OperatorSpecification& op_spec, const DeviceConstraint& dev_constr){ 00160 00161 boost::lock_guard<boost::mutex> lock(global_mutex); 00162 00163 std::map<std::string,std::tr1::shared_ptr<Operation> >::iterator it; 00164 00165 std::string name_of_operation=op_spec.getOperatorName(); 00166 00167 it=map_operationname_to_operation_.find(name_of_operation); 00168 00169 std::tr1::shared_ptr<Operation> op; 00170 00171 if(it==map_operationname_to_operation_.end()){ //operation does not exist 00172 00173 std::cout << "[HyPE:] FATAL ERROR: In hype::core::Scheduler::getOptimalAlgorithm(): Operation " << name_of_operation << " does not exist!!!" << std::endl; 00174 cout << "File: " << __FILE__ << " at Line: " << __LINE__ << endl; 00175 exit(-1); 00176 }//else{ 00177 00178 op=it->second; //std::tr1::shared_ptr<Operation> (it->second); 00179 if(!op){ 00180 std::cout << "FATAL ERROR: Operation " << name_of_operation << ": NULL Pointer!" << std::endl; 00181 exit(-1); 00182 } 00183 return op->getOptimalAlgorithm(op_spec.getFeatureVector(), dev_constr); 00184 00185 } 00186 00187 // const SchedulingDecision Scheduler::getOptimalAlgorithmName(const std::string& name_of_operation, const Tuple& input_values, DeviceTypeConstraint dev_constr){ 00188 // 00189 // boost::lock_guard<boost::mutex> lock(global_mutex); 00190 // 00191 // std::map<std::string,std::tr1::shared_ptr<Operation> >::iterator it; 00192 // 00193 // it=map_operationname_to_operation_.find(name_of_operation); 00194 // 00195 // std::tr1::shared_ptr<Operation> op; 00196 // 00197 // if(it==map_operationname_to_operation_.end()){ //operation does not exist 00198 // std::cout << "FATAL ERROR: Operation " << name_of_operation << " does not exist!!!" << std::endl; 00199 // exit(-1); 00200 // }//else{ 00201 // 00202 // op=it->second; //std::tr1::shared_ptr<Operation> (it->second); 00203 // if(!op){ 00204 // std::cout << "FATAL ERROR: Operation " << name_of_operation << ": NULL Pointer!" << std::endl; 00205 // exit(-1); 00206 // } 00207 // return op->getOptimalAlgorithm(input_values, dev_constr); 00208 // //} 00209 // 00210 // //return SchedulingDecision(std::string(),EstimatedTime(-1.0),Tuple()); 00211 // } 00212 00213 bool Scheduler::addObservation(const SchedulingDecision& sched_dec, const double& measured_execution_time){ 00214 boost::lock_guard<boost::mutex> lock(global_mutex); 00215 00216 this->proc_devs_.removeSchedulingDecision(sched_dec); 00217 00218 const std::string name_of_algorithm(sched_dec.getNameofChoosenAlgorithm()); 00219 const MeasurementPair mp(sched_dec.getFeatureValues(), 00220 MeasuredTime(measured_execution_time), 00221 sched_dec.getEstimatedExecutionTimeforAlgorithm()); 00222 00223 MapNameToOperation::iterator it; 00224 for(it= map_operationname_to_operation_.begin(); it !=map_operationname_to_operation_.end(); it++){ 00225 if(it->second->hasAlgorithm(name_of_algorithm)) 00226 return it->second->addObservation(name_of_algorithm,mp); 00227 } 00228 return false; 00229 00230 } 00231 00232 EstimatedTime Scheduler::getEstimatedExecutionTime(const OperatorSpecification& op_spec, const std::string& alg_name){ 00233 boost::lock_guard<boost::mutex> lock(global_mutex); 00234 AlgorithmPtr alg_ptr = getAlgorithm(alg_name); 00235 if(!alg_ptr){ 00236 std::cout << "FATAL ERROR! Scheduler::getEstimatedExecutionTime(): Algorithm '" << alg_name << "' not found for operation '" 00237 << op_spec.getOperatorName() << "'!" << std::endl; 00238 std::exit(-1); 00239 } 00240 return alg_ptr->getEstimatedExecutionTime(op_spec.getFeatureVector()); 00241 } 00242 00243 00244 void Scheduler::print(){ 00245 cout << "HyPE Status:" << endl; 00246 MapNameToOperation::iterator it; 00247 for(it=map_operationname_to_operation_.begin();it!=map_operationname_to_operation_.end();++it){ 00248 std::cout << "Operation: '" << it->second->getName() << "'" << std::endl; 00249 std::vector<AlgorithmPtr> algs = it->second->getAlgorithms(); 00250 for (unsigned int i=0;i<algs.size();++i){ 00251 std::cout << "\t" << algs[i]->getName() << endl; 00252 } 00253 } 00254 } 00255 00256 /* 00257 bool Scheduler::addObservation(const std::string& name_of_algorithm, const MeasurementPair& mp){ 00258 00259 boost::lock_guard<boost::mutex> lock(global_mutex); 00260 00261 this->proc_devs_.removeSchedulingDecision(sched_dec); 00262 00263 MapNameToOperation::iterator it; 00264 for(it= map_operationname_to_operation_.begin(); it !=map_operationname_to_operation_.end(); it++){ 00265 if(it->second->hasAlgorithm(name_of_algorithm)) 00266 return it->second->addObservation(name_of_algorithm,mp); 00267 } 00268 return false; 00269 }*/ 00270 00271 Scheduler::ProcessingDevices& Scheduler::getProcessingDevices(){ 00272 return this->proc_devs_; 00273 } 00274 00275 Scheduler::ProcessingDevices::ProcessingDevices() : virt_comp_devs_() {} 00276 00277 bool Scheduler::ProcessingDevices::addDevice(const DeviceSpecification& dev_spec) 00278 { 00279 assert(!this->exists(dev_spec)); 00280 this->virt_comp_devs_.insert(std::make_pair(dev_spec.getProcessingDeviceID(), 00281 VirtualProcessingDevicePtr( new VirtualProcessingDevice(dev_spec)) 00282 ) 00283 ); 00284 return true; 00285 } 00286 00287 bool Scheduler::ProcessingDevices::exists(const DeviceSpecification& dev_spec) const throw() 00288 { 00289 Devices::const_iterator cit; 00290 for(cit=this->virt_comp_devs_.begin();cit!=this->virt_comp_devs_.end();++cit){ 00291 // VirtualProcessingDevicePtr virtual_proc_dev_ptr=cit->second; 00292 // DeviceSpecification dev_spec_current = virtual_proc_dev_ptr->getDeviceSpecification(); 00293 // if(dev_spec_current==dev_spec){ 00294 // return true; 00295 // } 00296 if(cit->second->getDeviceSpecification()==dev_spec){ 00297 return true; 00298 } 00299 } 00300 return false; 00301 } 00302 00303 const Scheduler::ProcessingDevices::Devices& Scheduler::ProcessingDevices::getDevices() const throw() 00304 { 00305 return this->virt_comp_devs_; 00306 } 00307 00308 VirtualProcessingDevicePtr Scheduler::ProcessingDevices::getProcessingDevice(ProcessingDeviceID dev_id) 00309 { 00310 Devices::iterator it; 00311 00312 it=virt_comp_devs_.find(dev_id); 00313 00314 VirtualProcessingDevicePtr virt_dev_ptr; 00315 00316 if(it==virt_comp_devs_.end()){ //operation does not exist 00317 std::cout << "FATAL ERROR: Processing Device with ID " << dev_id << " does not exist!!!" << std::endl; 00318 exit(-1); 00319 } 00320 00321 virt_dev_ptr=it->second; 00322 00323 return virt_dev_ptr; 00324 } 00325 00326 bool Scheduler::ProcessingDevices::addSchedulingDecision(const SchedulingDecision& sched_dec) 00327 { 00328 ProcessingDeviceID dev_id = sched_dec.getDeviceSpecification().getProcessingDeviceID(); 00329 VirtualProcessingDevicePtr vir_proc_dev = this->getProcessingDevice(dev_id); 00330 if(!vir_proc_dev){ 00331 cout << "Error! Could not find Processing Device for Device ID '" << dev_id << "'" << endl; 00332 return false; 00333 } 00334 return vir_proc_dev->addRunningOperation(sched_dec); 00335 } 00336 00337 bool Scheduler::ProcessingDevices::removeSchedulingDecision(const SchedulingDecision& sched_dec) 00338 { 00339 ProcessingDeviceID dev_id = sched_dec.getDeviceSpecification().getProcessingDeviceID(); 00340 VirtualProcessingDevicePtr vir_proc_dev = this->getProcessingDevice(dev_id); 00341 if(!vir_proc_dev){ 00342 cout << "Error! Could not find Processing Device for Device ID '" << dev_id << "'" << endl; 00343 return false; 00344 } 00345 return vir_proc_dev->removeFinishedOperation(sched_dec); 00346 } 00347 00348 void Scheduler::ProcessingDevices::print() const throw(){ 00349 Devices::const_iterator cit; 00350 for(cit= virt_comp_devs_.begin();cit!= virt_comp_devs_.end();++cit){ 00351 cit->second->print(); 00352 } 00353 } 00354 00355 00356 00357 };//end namespace core 00358 }; //end namespace hype 00359 00360 00361 00362 00363 00364