Hybrid Query Processing Engine for Coprocessing in Database Systems
HyPE
scheduler.cpp
Go to the documentation of this file.
00001 
00002 #include <core/scheduler.hpp>
00003 #include <plugins/pluginloader.hpp>
00004 #include <stdlib.h>
00005 
00006 #include <boost/thread.hpp>
00007 
00008 using namespace std;
00009 
00010 
00011 namespace hype{
00012    namespace core{
00013       
00014       using namespace queryprocessing;
00015 
00016       boost::mutex global_mutex;
00017       
00018       Scheduler::Scheduler() : map_operationname_to_operation_(), map_statisticalmethodname_to_statisticalmethod_(),proc_devs_(){
00019          //is called once, because Scheduler is a Singelton
00020          PluginLoader::loadPlugins();
00021       }
00022 
00023 
00024       bool Scheduler::addAlgorithm(const AlgorithmSpecification& alg_spec, const DeviceSpecification& dev_spec){  
00025          boost::lock_guard<boost::mutex> lock(global_mutex);
00026          
00027          if(!this->proc_devs_.exists(dev_spec)){
00028             if(!this->proc_devs_.addDevice(dev_spec)){
00029                std::cout << "FATAL ERROR! Failed to add Processing Device while creatign Algorithm '" << alg_spec.getAlgorithmName() << "'" << std::endl;
00030             }
00031          }
00032          
00033          std::string name_of_operation=alg_spec.getOperationName();
00034          
00035          std::map<std::string,std::tr1::shared_ptr<Operation> >::iterator it;
00036 
00037          it=map_operationname_to_operation_.find(name_of_operation);
00038 
00039          std::tr1::shared_ptr<Operation> op;
00040 
00041          if(it==map_operationname_to_operation_.end()){ //operation does not exist
00042             op = std::tr1::shared_ptr<Operation>(new Operation(name_of_operation));
00043             map_operationname_to_operation_[name_of_operation]=op;
00044          }else{
00045             op=it->second; //std::tr1::shared_ptr<Operation> (it->second);
00046          }
00047 
00048          return op->addAlgorithm(alg_spec.getAlgorithmName(), 
00049                                  dev_spec,
00050                                  alg_spec.getStatisticalMethodName(), 
00051                                  alg_spec.getRecomputationHeuristicName());
00052       }
00053 
00054       Scheduler& Scheduler::instance(){
00055          static Scheduler scheduler;
00056          return scheduler;
00057       }
00058    /*
00059       bool Scheduler::addAlgorithm(const std::string& name_of_operation, 
00060                               const std::string& name_of_algorithm, 
00061                               DeviceSpecification comp_dev,
00062                               const std::string& name_of_statistical_method, 
00063                               const std::string& name_of_recomputation_strategy){
00064                                  
00065          boost::lock_guard<boost::mutex> lock(global_mutex);
00066          
00067          std::map<std::string,std::tr1::shared_ptr<Operation> >::iterator it;
00068 
00069          it=map_operationname_to_operation_.find(name_of_operation);
00070 
00071          std::tr1::shared_ptr<Operation> op;
00072 
00073          if(it==map_operationname_to_operation_.end()){ //operation does not exist
00074             op = std::tr1::shared_ptr<Operation>(new Operation(name_of_operation));
00075             map_operationname_to_operation_[name_of_operation]=op;
00076          }else{
00077             op=it->second; //std::tr1::shared_ptr<Operation> (it->second);
00078          }
00079 
00080          return op->addAlgorithm(name_of_algorithm, 
00081                                  comp_dev,
00082                                  name_of_statistical_method, 
00083                                  name_of_recomputation_strategy);
00084 
00085 
00086        //return true;
00087       }*/
00088 
00089       bool Scheduler::setOptimizationCriterion(const std::string& name_of_operation,
00090                                           const std::string& name_of_optimization_criterion){
00091                                              
00092          boost::lock_guard<boost::mutex> lock(global_mutex);
00093          
00094 //       map_operationname_to_operation_.find
00095 //       StatisticalMethodMap::iterator it = map_statisticalmethodname_to_statisticalmethod_.find(name_of_operation);
00096          
00097          std::tr1::shared_ptr<OptimizationCriterion_Internal> opt_crit = getNewOptimizationCriterionbyName(name_of_optimization_criterion);
00098          if(opt_crit){
00099             //StatisticalMethodMap::iterator it = map_statisticalmethodname_to_statisticalmethod_.find(name_of_operation);
00100             
00101             MapNameToOperation::iterator it = map_operationname_to_operation_.find(name_of_operation);
00102             if(it==map_operationname_to_operation_.end()){
00103                cout << "Operation not found! " << name_of_operation << endl;
00104                return false;
00105             }
00106             return (*it).second->setNewOptimizationCriterion(name_of_optimization_criterion);
00107             //map_operationname_to_operation_[name_of_operation]=opt_crit;
00108             //return true;
00109          }
00110          return false;
00111       }
00112 
00113       /* not part of intrface!!! -> no thread safety implemented!!!*/
00114       const AlgorithmPtr Scheduler::getAlgorithm(const std::string& name_of_algorithm){
00115          
00116          //boost::lock_guard<boost::mutex> lock(global_mutex);
00117          
00118          MapNameToOperation::iterator it;
00119          for(it= map_operationname_to_operation_.begin(); it !=map_operationname_to_operation_.end(); it++){
00120             if(it->second->hasAlgorithm(name_of_algorithm))
00121                 return it->second->getAlgorithm(name_of_algorithm);  
00122          }
00123          return AlgorithmPtr(); //NULL Pointer if algorithm is not found
00124       }
00125 
00126 
00127       bool Scheduler::setStatisticalMethod(const std::string& name_of_algorithm,
00128                                       const std::string& name_of_statistical_method){
00129                                          
00130          boost::lock_guard<boost::mutex> lock(global_mutex);
00131          
00132          AlgorithmPtr alg_ptr = this->getAlgorithm(name_of_algorithm);
00133          if(!alg_ptr){
00134             cout << "Error in Scheduler::setStatisticalMethod(): Algorithm " << name_of_algorithm << " not found!" << endl;
00135             return false;
00136          }
00137          cout << "Found Algorithm: " << alg_ptr->getName() << endl;
00138          StatisticalMethodPtr stat_meth_ptr = getNewStatisticalMethodbyName(name_of_statistical_method);
00139          if(!stat_meth_ptr) return false;
00140          cout << "created statistical method: " << name_of_statistical_method << endl;    
00141          return alg_ptr->setStatisticalMethod(stat_meth_ptr);
00142       }
00143       
00144       bool Scheduler::setRecomputationHeuristic(const std::string& name_of_algorithm,
00145                                            const std::string& name_of_recomputation_strategy){
00146    
00147          boost::lock_guard<boost::mutex> lock(global_mutex);
00148          
00149          AlgorithmPtr alg_ptr = this->getAlgorithm(name_of_algorithm);
00150          if(!alg_ptr){
00151             cout << "Error in Scheduler::setRecomputationHeuristic(): Algorithm " << name_of_algorithm << " not found!" << endl;
00152             return false;
00153          }
00154          std::tr1::shared_ptr<RecomputationHeuristic_Internal> recomp_heuristic = getNewRecomputationHeuristicbyName(name_of_recomputation_strategy);
00155          if(!recomp_heuristic) return false;
00156          return alg_ptr->setRecomputationHeuristic(recomp_heuristic);
00157       }
00158 
00159       const SchedulingDecision Scheduler::getOptimalAlgorithm(const OperatorSpecification& op_spec, const DeviceConstraint& dev_constr){
00160          
00161          boost::lock_guard<boost::mutex> lock(global_mutex);
00162          
00163          std::map<std::string,std::tr1::shared_ptr<Operation> >::iterator it;
00164 
00165          std::string name_of_operation=op_spec.getOperatorName();
00166 
00167          it=map_operationname_to_operation_.find(name_of_operation);
00168 
00169          std::tr1::shared_ptr<Operation> op;
00170 
00171          if(it==map_operationname_to_operation_.end()){ //operation does not exist
00172                             
00173             std::cout << "[HyPE:] FATAL ERROR: In hype::core::Scheduler::getOptimalAlgorithm(): Operation " << name_of_operation  <<  " does not exist!!!" << std::endl;
00174                               cout << "File: " << __FILE__ << " at Line: " << __LINE__ << endl;
00175             exit(-1);
00176          }//else{
00177             
00178             op=it->second; //std::tr1::shared_ptr<Operation> (it->second);
00179             if(!op){
00180                std::cout << "FATAL ERROR: Operation " << name_of_operation  <<  ": NULL Pointer!" << std::endl;
00181                exit(-1);
00182             }
00183             return op->getOptimalAlgorithm(op_spec.getFeatureVector(), dev_constr); 
00184       
00185       }  
00186 
00187 //    const SchedulingDecision Scheduler::getOptimalAlgorithmName(const std::string& name_of_operation, const Tuple& input_values, DeviceTypeConstraint dev_constr){
00188 //
00189 //       boost::lock_guard<boost::mutex> lock(global_mutex);
00190 //       
00191 //       std::map<std::string,std::tr1::shared_ptr<Operation> >::iterator it;
00192 //
00193 //       it=map_operationname_to_operation_.find(name_of_operation);
00194 //
00195 //       std::tr1::shared_ptr<Operation> op;
00196 //
00197 //       if(it==map_operationname_to_operation_.end()){ //operation does not exist
00198 //          std::cout << "FATAL ERROR: Operation " << name_of_operation  <<  " does not exist!!!" << std::endl;
00199 //          exit(-1);
00200 //       }//else{
00201 //          
00202 //          op=it->second; //std::tr1::shared_ptr<Operation> (it->second);
00203 //          if(!op){
00204 //             std::cout << "FATAL ERROR: Operation " << name_of_operation  <<  ": NULL Pointer!" << std::endl;
00205 //             exit(-1);
00206 //          }
00207 //          return op->getOptimalAlgorithm(input_values, dev_constr);
00208 //       //}
00209 //
00210 //       //return SchedulingDecision(std::string(),EstimatedTime(-1.0),Tuple());
00211 //    }
00212       
00213       bool Scheduler::addObservation(const SchedulingDecision& sched_dec, const double& measured_execution_time){
00214          boost::lock_guard<boost::mutex> lock(global_mutex);
00215          
00216          this->proc_devs_.removeSchedulingDecision(sched_dec);
00217          
00218          const std::string name_of_algorithm(sched_dec.getNameofChoosenAlgorithm()); 
00219          const MeasurementPair mp(sched_dec.getFeatureValues(),
00220                                   MeasuredTime(measured_execution_time),
00221                                   sched_dec.getEstimatedExecutionTimeforAlgorithm());
00222          
00223          MapNameToOperation::iterator it;
00224          for(it= map_operationname_to_operation_.begin(); it !=map_operationname_to_operation_.end(); it++){
00225             if(it->second->hasAlgorithm(name_of_algorithm))
00226                 return it->second->addObservation(name_of_algorithm,mp);         
00227          }
00228          return false;        
00229          
00230       }
00231       
00232       EstimatedTime Scheduler::getEstimatedExecutionTime(const OperatorSpecification& op_spec, const std::string& alg_name){
00233          boost::lock_guard<boost::mutex> lock(global_mutex);
00234          AlgorithmPtr alg_ptr = getAlgorithm(alg_name);
00235          if(!alg_ptr){
00236             std::cout << "FATAL ERROR! Scheduler::getEstimatedExecutionTime(): Algorithm '" << alg_name << "' not found for operation '" 
00237                       << op_spec.getOperatorName() << "'!" << std::endl;
00238             std::exit(-1);
00239          }
00240          return alg_ptr->getEstimatedExecutionTime(op_spec.getFeatureVector());
00241       }
00242 
00243 
00244       void Scheduler::print(){
00245                     cout << "HyPE Status:" << endl;
00246                     MapNameToOperation::iterator it;
00247                     for(it=map_operationname_to_operation_.begin();it!=map_operationname_to_operation_.end();++it){
00248                         std::cout << "Operation: '" << it->second->getName() << "'" << std::endl;
00249                         std::vector<AlgorithmPtr> algs = it->second->getAlgorithms();
00250                         for (unsigned int i=0;i<algs.size();++i){
00251                             std::cout << "\t" << algs[i]->getName() << endl;
00252                         }
00253                     }
00254                 }                
00255                 
00256       /*
00257       bool Scheduler::addObservation(const std::string& name_of_algorithm, const MeasurementPair& mp){
00258          
00259          boost::lock_guard<boost::mutex> lock(global_mutex);
00260          
00261          this->proc_devs_.removeSchedulingDecision(sched_dec);
00262          
00263          MapNameToOperation::iterator it;
00264          for(it= map_operationname_to_operation_.begin(); it !=map_operationname_to_operation_.end(); it++){
00265             if(it->second->hasAlgorithm(name_of_algorithm))
00266                 return it->second->addObservation(name_of_algorithm,mp);         
00267          }
00268          return false;
00269       }*/
00270 
00271       Scheduler::ProcessingDevices& Scheduler::getProcessingDevices(){
00272          return this->proc_devs_;
00273       } 
00274             
00275 Scheduler::ProcessingDevices::ProcessingDevices() : virt_comp_devs_() {}
00276 
00277 bool Scheduler::ProcessingDevices::addDevice(const DeviceSpecification& dev_spec)
00278 {
00279    assert(!this->exists(dev_spec));
00280    this->virt_comp_devs_.insert(std::make_pair(dev_spec.getProcessingDeviceID(),
00281                                                VirtualProcessingDevicePtr( new VirtualProcessingDevice(dev_spec))
00282                                               )
00283                                );
00284    return true;
00285 }
00286 
00287 bool Scheduler::ProcessingDevices::exists(const DeviceSpecification& dev_spec) const throw()
00288 {
00289    Devices::const_iterator cit;
00290    for(cit=this->virt_comp_devs_.begin();cit!=this->virt_comp_devs_.end();++cit){
00291 //       VirtualProcessingDevicePtr virtual_proc_dev_ptr=cit->second;
00292 //       DeviceSpecification dev_spec_current = virtual_proc_dev_ptr->getDeviceSpecification();
00293 //       if(dev_spec_current==dev_spec){
00294 //             return true;
00295 //       }
00296          if(cit->second->getDeviceSpecification()==dev_spec){
00297                return true;
00298          }
00299    }
00300    return false;
00301 }
00302 
00303 const Scheduler::ProcessingDevices::Devices& Scheduler::ProcessingDevices::getDevices() const throw()
00304 {
00305    return this->virt_comp_devs_;
00306 }
00307 
00308 VirtualProcessingDevicePtr Scheduler::ProcessingDevices::getProcessingDevice(ProcessingDeviceID dev_id)
00309 {
00310          Devices::iterator it;
00311 
00312          it=virt_comp_devs_.find(dev_id);
00313 
00314          VirtualProcessingDevicePtr virt_dev_ptr;
00315 
00316          if(it==virt_comp_devs_.end()){ //operation does not exist
00317             std::cout << "FATAL ERROR: Processing Device with ID " << dev_id <<  " does not exist!!!" << std::endl;
00318             exit(-1);
00319          }
00320          
00321          virt_dev_ptr=it->second;
00322          
00323          return virt_dev_ptr;
00324 }
00325 
00326 bool Scheduler::ProcessingDevices::addSchedulingDecision(const SchedulingDecision& sched_dec)
00327 {
00328              ProcessingDeviceID dev_id = sched_dec.getDeviceSpecification().getProcessingDeviceID();
00329              VirtualProcessingDevicePtr vir_proc_dev = this->getProcessingDevice(dev_id);
00330              if(!vir_proc_dev){
00331                 cout << "Error! Could not find Processing Device for Device ID '" << dev_id <<  "'" << endl;
00332                 return false;
00333              }
00334              return vir_proc_dev->addRunningOperation(sched_dec);
00335 }
00336 
00337 bool Scheduler::ProcessingDevices::removeSchedulingDecision(const SchedulingDecision& sched_dec)
00338 {
00339              ProcessingDeviceID dev_id = sched_dec.getDeviceSpecification().getProcessingDeviceID();
00340              VirtualProcessingDevicePtr vir_proc_dev = this->getProcessingDevice(dev_id);
00341              if(!vir_proc_dev){
00342                 cout << "Error! Could not find Processing Device for Device ID '" << dev_id <<  "'" << endl;
00343                 return false;
00344              }
00345              return vir_proc_dev->removeFinishedOperation(sched_dec);
00346 }
00347 
00348                void Scheduler::ProcessingDevices::print() const throw(){
00349                   Devices::const_iterator cit;
00350                   for(cit= virt_comp_devs_.begin();cit!= virt_comp_devs_.end();++cit){
00351                      cit->second->print();   
00352                   }
00353                }     
00354 
00355 
00356 
00357    };//end namespace core
00358 }; //end namespace hype
00359 
00360 
00361 
00362 
00363 
00364 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines