Hybrid Query Processing Engine for Coprocessing in Database Systems
HyPE
optimization_criterion.cpp
Go to the documentation of this file.
00001 #include <iostream>
00002 #include <string>
00003 #include <vector>
00004 #include <memory>
00005 #include <core/optimization_criterion.hpp>
00006 #include <core/time_measurement.hpp>
00007 #include <core/operation.hpp>
00008 #include <core/scheduler.hpp>
00009 
00010 #include <query_processing/processing_device.hpp>
00011 #include <query_processing/virtual_processing_device.hpp>
00012 #include <config/configuration.hpp>
00013 
00014 using namespace std;
00015 namespace hype
00016 {
00017    namespace core
00018    {
00019       const SchedulingDecision hype::core::OptimizationCriterion_Internal::getOptimalAlgorithm(const Tuple& input_values, Operation& op, DeviceTypeConstraint dev_constr)
00020       {
00021          //skip training mechanism!
00022          if(dev_constr!=hype::ANY_DEVICE){
00023              SchedulingDecision sched_dec = this->getOptimalAlgorithm_internal(input_values, op, dev_constr);
00024              core::Scheduler::instance().getProcessingDevices().addSchedulingDecision(sched_dec);
00025              return sched_dec;
00026          }
00027          
00028          std::vector<AlgorithmPtr> alg_ptrs = op.getAlgorithms();
00029 
00030          /*
00031          for(unsigned int i=0; i<alg_ptrs.size(); i++) {
00032                if(alg_ptrs[i]->getNumberOfDecisionsforThisAlgorithm()==Runtime_Configuration::instance().getTrainingLength()){
00033                   while(alg_ptrs[i]->getNumberOfTerminatedExecutions()<alg_ptrs[i]->getNumberOfDecisionsforThisAlgorithm()){
00034                      //wait until trainingphase is over?!
00035                   }
00036                }
00037                   
00038          }*/
00039 
00040 
00041          //executed once for round robin training
00042          if(map_algorithmname_to_number_of_executions_.empty()){
00043             std::vector<AlgorithmPtr> algs = op.getAlgorithms();
00044             std::vector<AlgorithmPtr>::const_iterator it;
00045             for(it=algs.begin(); it!=algs.end(); ++it) {
00046                map_algorithmname_to_number_of_executions_[(*it)->getName()]=0; //init map
00047             }
00048          }
00049 
00050          bool all_algorithms_finished_training=true;
00051          for(unsigned int i=0; i<alg_ptrs.size(); i++) {
00052             //cout << "Algorithm " << alg_ptrs[i]->getName() << "\tTraining: " << alg_ptrs[i]->inTrainingPhase()  << " \t Dec: " << alg_ptrs[i]->getNumberOfDecisionsforThisAlgorithm()
00053                //<< " \t Exec: " << alg_ptrs[i]->getNumberOfTerminatedExecutions() << endl;        
00054             if(alg_ptrs[i]->inTrainingPhase()) {
00055                all_algorithms_finished_training=false;
00056             }
00057          }
00058          //cout << "all algorithms finsihed training: "  << all_algorithms_finished_training << endl;
00059          if(!all_algorithms_finished_training) {
00060             std::map<std::string,unsigned int>::iterator it;
00061             std::string alg_with_min_executions;
00062             unsigned int min_execution=std::numeric_limits<unsigned int>::max();
00063             for(it = map_algorithmname_to_number_of_executions_.begin(); it!= map_algorithmname_to_number_of_executions_.end(); ++it) {
00064                if(it->second<min_execution) {
00065                   min_execution=it->second;
00066                   alg_with_min_executions=it->first;
00067                }
00068             }
00069             AlgorithmPtr pointer_to_choosen_algorithm=op.getAlgorithm(alg_with_min_executions);
00070             assert(pointer_to_choosen_algorithm!=NULL);
00071             //cout << "Training: Choosing: " << alg_with_min_executions << endl;
00072             map_algorithmname_to_number_of_executions_[alg_with_min_executions]++;
00073             //return SchedulingDecision(*pointer_to_choosen_algorithm,EstimatedTime(-1),input_values);
00074              SchedulingDecision sched_dec(*pointer_to_choosen_algorithm,EstimatedTime(-1),input_values);
00075              core::Scheduler::instance().getProcessingDevices().addSchedulingDecision(sched_dec);
00076              return sched_dec;
00077          }
00078 
00079          for(unsigned int i=0; i<alg_ptrs.size(); i++) {
00080             if(!quiet && verbose)
00081                cout << "Algorithm: " << alg_ptrs[i]->getName() << "   In Training Phase: " << alg_ptrs[i]->inTrainingPhase() << endl;
00082 #ifdef TIMESTAMP_BASED_LOAD_ADAPTION
00083             //FEATURE: Timestamp based load adaption (triggers retraining) %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00084             //if algorithm was not executed for a long time (Configuration::maximal_time_where_algorithm_was_not_choosen times), retrain algorithm
00085             if(alg_ptrs[i]->getTimeOfLastExecution()+Configuration::maximal_time_where_algorithm_was_not_choosen<op.getCurrentTimestamp()) {
00086                cout << "Operation execution number: " << op.getCurrentTimestamp() << endl;
00087                alg_ptrs[i]->retrain();
00088             }
00089 #endif
00090 #ifdef LOAD_MODIFICATOR_BASED_LOAD_ADAPTION
00091             //FEATURE: Load Modification factor based load adaption %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00092             if(alg_ptrs[i]->getLoadChangeEstimator().getLoadModificator()>2
00093                || alg_ptrs[i]->getLoadChangeEstimator().getLoadModificator()<0.5f) { //execution times increased by factor of 2 -> sifnificant load change, retrain all algorithms?
00094                cout << "Operation execution number: " << op.getCurrentTimestamp() << "\tAlgorithm: " << alg_ptrs[i]->getName() << "\t"
00095                     << "Significant load change confirmed: " << alg_ptrs[i]->getLoadChangeEstimator().getLoadModificator() << endl;
00096             }
00097 #endif
00098             //train algorithms in round robin manner
00099             /*if(alg_ptrs[i]->inTrainingPhase()) {
00100                return SchedulingDecision(alg_ptrs[i]->getName(),EstimatedTime(-1),input_values);
00101             }*/
00102 
00103             if(alg_ptrs[i]->inRetrainingPhase()) {
00104                //return SchedulingDecision(*alg_ptrs[i],EstimatedTime(alg_ptrs[i]->getEstimatedExecutionTime(input_values)),input_values);
00105                SchedulingDecision sched_dec(*alg_ptrs[i],EstimatedTime(alg_ptrs[i]->getEstimatedExecutionTime(input_values)),input_values);
00106                core::Scheduler::instance().getProcessingDevices().addSchedulingDecision(sched_dec);
00107                return sched_dec;             
00108             }//*/
00109          }
00110 
00111           SchedulingDecision sched_dec=this->getOptimalAlgorithm_internal(input_values, op, dev_constr);
00112           core::Scheduler::instance().getProcessingDevices().addSchedulingDecision(sched_dec);
00113           return sched_dec;
00114              
00115          //return this->getOptimalAlgorithm_internal(input_values, op, dev_constr);
00116 
00117       }
00118       const std::string& OptimizationCriterion_Internal::getName() const
00119       {
00120          return name_of_optimization_criterion_;
00121       }
00122 
00123       OptimizationCriterion_Internal::OptimizationCriterion_Internal(const std::string& name_of_optimization_criterion, const std::string& name_of_operation) : map_algorithmname_to_number_of_executions_(), name_of_optimization_criterion_(name_of_optimization_criterion), name_of_operation_(name_of_operation) {}
00124 
00125 
00126       OptimizationCriterion_Internal::~OptimizationCriterion_Internal() {}
00127 
00128       std::tr1::shared_ptr<OptimizationCriterion_Internal> getNewOptimizationCriterionbyName(const std::string& name_of_optimization_criterion)
00129       {
00130          OptimizationCriterion_Internal* ptr = OptimizationCriterionFactorySingleton::Instance().CreateObject(name_of_optimization_criterion); //.( 1, createProductNull );
00131          return std::tr1::shared_ptr<OptimizationCriterion_Internal> (ptr);
00132       }
00133 
00134       OptimizationCriterionFactory& OptimizationCriterionFactorySingleton::Instance()
00135       {
00136          static OptimizationCriterionFactory factory;
00137          return factory;
00138       }
00139    }; //end namespace core
00140 }; //end namespace hype
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines