Column-oriented GPU-accelerated Database Management System
CoGaDB
/home/sebastian/gpudbms/trunk/hype-library/include/query_processing/benchmark.hpp
Go to the documentation of this file.
00001 
00002 #pragma once
00003 
00004 #include <algorithm>
00005 #include <cstdlib>
00006 #include <iostream>
00007 #include <fstream>
00008 #include <cassert>
00009 #include <vector>
00010 
00011 #include <stdint.h>
00012 #include <sys/mman.h>
00013 
00014 #include <hype.hpp>
00015 #include <config/configuration.hpp>
00016 #include <util/architecture.hpp>
00017 #include <query_processing/operator.hpp>
00018 #include <query_processing/processing_device.hpp>
00019 #include <query_processing/node.hpp>
00020 #include <query_processing/logical_query_plan.hpp>
00021 
00022 #include <tbb/parallel_sort.h>
00023 #include <tbb/task_scheduler_init.h>
00024 
00025 #include <boost/smart_ptr.hpp> 
00026 #include <boost/thread.hpp> 
00027 #include <boost/program_options.hpp> 
00028 #include <boost/random.hpp>
00029 #include <boost/generator_iterator.hpp>
00030 #include <boost/chrono.hpp>
00031 
00032 namespace hype
00033 {
00034 
00035         namespace queryprocessing
00036         {
00037 
00038                 enum SchedulingConfiguration{CPU_ONLY,GPU_ONLY,HYBRID};
00039                 
00040 template <typename Type>
00041 class Operation_Benchmark{
00042         public:
00043         struct Random_Number_Generator{
00044 
00045         Random_Number_Generator(unsigned int max_value_size) : max_value_size_(max_value_size){}
00046 
00047         unsigned int operator() (){
00048                 return (unsigned int) rand()%max_value_size_;
00049         }
00050         private:
00051                 unsigned int max_value_size_;
00052         };
00053         
00054         typedef Type type;
00055         typedef typename OperatorMapper_Helper_Template<Type>::TypedOperatorPtr TypedOperatorPtr;       
00056         typedef typename OperatorMapper_Helper_Template<Type>::Physical_Operator_Map Physical_Operator_Map;     
00057         typedef typename OperatorMapper_Helper_Template<Type>::Physical_Operator_Map_Ptr Physical_Operator_Map_Ptr;             
00058         typedef typename OperatorMapper_Helper_Template<Type>::Create_Typed_Operator_Function Create_Typed_Operator_Function;   
00059         typedef typename OperatorMapper_Helper_Template<Type>::TypedNodePtr TypedNodePtr;
00060         
00061         /*
00062         typedef OperatorMapper_Helper_Template<Type>::TypedOperatorPtr TypedOperatorPtr;        
00063         typedef OperatorMapper_Helper_Template<Type>::Physical_Operator_Map Physical_Operator_Map;      
00064         typedef OperatorMapper_Helper_Template<Type>::Physical_Operator_Map_Ptr Physical_Operator_Map_Ptr;              
00065         typedef OperatorMapper_Helper_Template<Type>::Create_Typed_Operator_Function Create_Typed_Operator_Function;    
00066         typedef OperatorMapper_Helper_Template<Type>::TypedNodePtr TypedNodePtr;
00067         */
00068 
00069 //typedef int ElementType;
00070 //typedef vector<ElementType> Vec;
00071 //typedef boost::shared_ptr<Vec> VecPtr;
00072 
00073         
00074 
00075         Operation_Benchmark(const std::string& operation_name,
00076                                                           const std::string& cpu_algorithm_name, const std::string& gpu_algorithm_name) :       
00077                                                                 operation_name_(operation_name),
00078                                                                 cpu_algorithm_name_(cpu_algorithm_name),
00079                                                                 gpu_algorithm_name_(gpu_algorithm_name),
00080                                                                 MAX_DATASET_SIZE_IN_MB_(1), //(10), //MB  //(10*1000*1000)/sizeof(int), //1000000,
00081                                                                 NUMBER_OF_DATASETS_(10), //3, //100,
00082                                                                 NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_(30), //3, //1000,
00083                                                                 RANDOM_SEED_(0),
00084                                                                 sched_config_(CPU_ONLY), //(HYBRID), //CPU_ONLY,GPU_ONLY,HYBRID
00085                                                                 stemod_optimization_criterion_("Response Time"),
00086                                                                 stemod_statistical_method_("Least Squares 1D"),
00087                                                                 stemod_recomputation_heuristic_("Periodic Recomputation"),
00088                                                                 //cpu( DeviceSpecification (hype::PD0,hype::CPU,hype::PD_Memory_0) ),
00089                                                                 //gpu( DeviceSpecification (hype::PD1,hype::GPU,hype::PD_Memory_1)  ),                                                          
00090                                                                 cpu( hype::queryprocessing::getProcessingDevice(DeviceSpecification (hype::PD0,hype::CPU,hype::PD_Memory_0)) ),
00091                                                                 gpu( hype::queryprocessing::getProcessingDevice(DeviceSpecification (hype::PD1,hype::GPU,hype::PD_Memory_1)) ),
00092                                                                 datasets(),
00093                                                                 operator_queries_(),
00094                                                                 rng_()//,
00095                                                                 //operator_map_()
00096                                                                 {
00097                 /*
00098                 if(!setup(argc, argv)){
00099                         std::cout << "Benchmark Setup Failed!" << std::endl;
00100                         std::exit(-1);
00101                 }*/
00102         }
00103 
00104         //RandomNumberGenerator random_number_generator_;
00105         std::string operation_name_;
00106         std::string cpu_algorithm_name_;
00107         std::string gpu_algorithm_name_;
00108 
00109         unsigned int MAX_DATASET_SIZE_IN_MB_; //MB  //(10*1000*1000)/sizeof(int); //1000000;
00110         unsigned int NUMBER_OF_DATASETS_; //3; //100;
00111         unsigned int NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_; //3; //1000;
00112         unsigned int RANDOM_SEED_;
00113 
00114         SchedulingConfiguration sched_config_; //CPU_ONLY,GPU_ONLY,HYBRID
00115 
00116         std::string stemod_optimization_criterion_;
00117         std::string stemod_statistical_method_;
00118         std::string stemod_recomputation_heuristic_;
00119 
00120         hype::queryprocessing::ProcessingDevice& cpu;
00121         hype::queryprocessing::ProcessingDevice& gpu;
00122 
00123         std::vector<Type> datasets;
00124         std::vector<TypedNodePtr> operator_queries_;
00125         
00126         boost::mt19937 rng_;  // produces randomness out of thin air
00127                                                                  // see pseudo-random number generators
00128 
00129 
00130         boost::mt19937& getRandomNumberGenerator(){
00131                 return rng_;
00132         }
00133 
00134         //Physical_Operator_Map_Ptr operator_map_;
00135 
00136         uint64_t getTimestamp()
00137         {
00138                 using namespace boost::chrono;
00139 
00140                 high_resolution_clock::time_point tp = high_resolution_clock::now();
00141                 nanoseconds dur = tp.time_since_epoch();
00142 
00143                 return (uint64_t)dur.count();
00144         }
00145 
00146 
00147                   virtual TypedNodePtr generate_logical_operator(Type dataset) = 0;
00148 
00149                   //virtual vector<TypedNodePtr> createOperatorQueries() = 0;
00150 
00151                   virtual Type generate_dataset(unsigned int size_in_number_of_bytes) = 0;
00152 
00153                   //virtual destructor
00154                   virtual ~Operation_Benchmark(){}
00155 
00156 //Type generate_dataset(unsigned int size_in_number_of_elements){
00157 //      VecPtr data(new Vec());
00158 //      for(unsigned int i=0;i<size_in_number_of_elements;i++){
00159 //              ElementType e = (ElementType) rand();
00160 //              data->push_back(e);
00161 //      }
00162 //      assert(data!=NULL);
00163 //      //std::cout << "created new data set: " << data.get() << " of size: " << data->size() << std::endl;
00164 //      return data;
00165 //}
00166 
00167 std::vector<Type> generate_random_datasets(unsigned int max_size_of_dataset_in_byte, unsigned int number_of_datasets){
00168         std::vector<Type> datasets;
00169         //first, generate dataset of full possible size, then decrease it with each loop according to a value tic, until the last dataset size is only tic
00170         unsigned int tic=max_size_of_dataset_in_byte/number_of_datasets;
00171                 for(unsigned int i=0;i<number_of_datasets;i++){
00172                         Type vec_ptr = this->generate_dataset(max_size_of_dataset_in_byte-i*tic); //(unsigned int) (rand()%max_size_in_number_of_elements) );
00173                         assert(vec_ptr!=NULL);
00174                         datasets.push_back(vec_ptr);
00175                 }
00176         return datasets;
00177 }
00178 
00179 
00180 
00181 
00182         int setup(int argc, char* argv[]){
00183 
00184         //we don't want the OS to swap out our data to disc that's why we lock it
00185         mlockall(MCL_CURRENT|MCL_FUTURE);
00186 
00187 //      tbb::task_scheduler_init init(8);
00188         
00189 
00190         //tbb::task_scheduler_init (2);
00191 //      tbb::task_scheduler_init init(1);
00192 //      
00193 
00194 
00195 //      cout << "TBB use " << tbb::task_scheduler_init::default_num_threads() << " number of threads as a default" << std::endl;
00196 //      unsigned int MAX_DATASET_SIZE_IN_MB_=10; //MB  //(10*1000*1000)/sizeof(int); //1000000;
00197 //      unsigned int NUMBER_OF_DATASETS_=10; //3; //100;
00198 //      unsigned int NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_=100; //3; //1000;
00199 //      unsigned int RANDOM_SEED_=0;
00200 //      //unsigned int READY_QUEUE_LENGTH=100;
00201 
00202 //      SchedulingConfiguration sched_config_=HYBRID; //CPU_ONLY,GPU_ONLY,HYBRID
00203 //      //SchedulingConfiguration sched_config_=GPU_ONLY;
00204 //      //SchedulingConfiguration sched_config_=CPU_ONLY;
00205 
00206 //      std::string stemod_optimization_criterion_="Response Time";
00207 //      std::string stemod_statistical_method_="Least Squares 1D";
00208 //      std::string stemod_recomputation_heuristic_="Periodic Recomputation";
00209 
00210 // Declare the supported options.
00211 boost::program_options::options_description desc("Allowed options");
00212 desc.add_options()
00213     ("help", "produce help message")
00214     ("number_of_datasets", boost::program_options::value<unsigned int>(), "set the number of data sets for workload")
00215     ("number_of_operations", boost::program_options::value<unsigned int>(), "set the number of operations in workload")
00216     ("max_dataset_size_in_MB", boost::program_options::value<unsigned int>(), "set the maximal dataset size in MB")
00217          //("ready_queue_length", boost::program_options::value<unsigned int>(), "set the queue length of operators that may be concurrently scheduled (clients are blocked on a processing device)")
00218     ("scheduling_method", boost::program_options::value<std::string>(), "set the decision model (CPU_ONLY, GPU_ONLY, HYBRID)")
00219     ("random_seed", boost::program_options::value<unsigned int>(), "seed to use before for generating datasets and operation workload")
00220     ("optimization_criterion", boost::program_options::value<std::string>(), "set the decision models optimization_criterion for all algorithms")
00221     ("statistical_method", boost::program_options::value<std::string>(), "set the decision models statistical_method for all algorithms")
00222     ("recomputation_heuristic", boost::program_options::value<std::string>(), "set the decision models recomputation_heuristic for all algorithms")
00223 ;
00224 
00225 boost::program_options::variables_map vm;
00226 boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
00227 boost::program_options::notify(vm);    
00228 
00229 if (vm.count("help")) {
00230    std::cout << desc << "\n";
00231     return 1;
00232 }
00233 
00234 if (vm.count("number_of_datasets")) {
00235     std::cout << "Number of Datasets: " 
00236  << vm["number_of_datasets"].as<unsigned int>() << "\n";
00237         NUMBER_OF_DATASETS_=vm["number_of_datasets"].as<unsigned int>();
00238 } else {
00239     std::cout << "number_of_datasets was not specified, using default value...\n";
00240 }
00241 
00242 if (vm.count("number_of_operations")) {
00243     std::cout << "Number of Operations: " 
00244  << vm["number_of_operations"].as<unsigned int>() << "\n";
00245         NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_=vm["number_of_operations"].as<unsigned int>();
00246 } else {
00247     std::cout << "number_of_operations was not specified, using default value...\n";
00248 }
00249 
00250 if (vm.count("max_dataset_size_in_MB")) {
00251     std::cout << "max_dataset_size_in_MB: " 
00252  << vm["max_dataset_size_in_MB"].as<unsigned int>() << "MB \n";
00253         MAX_DATASET_SIZE_IN_MB_=vm["max_dataset_size_in_MB"].as<unsigned int>(); //*1024*1024)/sizeof(int); //convert value in MB to equivalent number of integer elements
00254 } else {
00255     std::cout << "max_dataset_size_in_MB was not specified, using default value...\n";
00256 }
00257 
00258 if (vm.count("random_seed")) {
00259     std::cout << "Random Seed: " 
00260  << vm["random_seed"].as<unsigned int>() << "\n";
00261         RANDOM_SEED_=vm["random_seed"].as<unsigned int>();
00262 } else {
00263     std::cout << "random_seed was not specified, using default value...\n";
00264 }
00265 
00266 
00267 if (vm.count("scheduling_method")) {
00268     std::cout << "scheduling_method: " 
00269  << vm["scheduling_method"].as<std::string>() << "\n";
00270         std::string scheduling_method=vm["scheduling_method"].as<std::string>();
00271         if(scheduling_method=="CPU_ONLY"){
00272                 sched_config_=CPU_ONLY;
00273         }else if(scheduling_method=="GPU_ONLY"){
00274                 sched_config_=GPU_ONLY;
00275         }else if(scheduling_method=="HYBRID"){
00276                 sched_config_=HYBRID;
00277         }
00278         
00279 } else {
00280     std::cout << "scheduling_method was not specified, using default value...\n";
00281 }
00282 
00283 if (vm.count("optimization_criterion")) {
00284     std::cout << "optimization_criterion: " 
00285  << vm["optimization_criterion"].as<std::string>() << "\n";
00286         stemod_optimization_criterion_=vm["optimization_criterion"].as<std::string>();
00287 
00288         if(sched_config_!=HYBRID){
00289                 std::cout << "Specification of STEMOD Parameter needs hybrid scheduling (scheduling_method=HYBRID)" << std::endl;
00290                 return -1;
00291         }
00292 
00293 } else {
00294     std::cout << "optimization_criterion was not specified, using default value...\n";
00295 }
00296 
00297 if (vm.count("statistical_method")) {
00298     std::cout << "statistical_method: " 
00299  << vm["statistical_method"].as<std::string>() << "\n";
00300         stemod_statistical_method_=vm["statistical_method"].as<std::string>();
00301         if(sched_config_!=HYBRID){
00302                 std::cout << "Specification of STEMOD Parameter needs hybrid scheduling (scheduling_method=HYBRID)" << std::endl;
00303                 return -1;
00304         }
00305 
00306 } else {
00307     std::cout << "statistical_method was not specified, using default value...\n";
00308 }
00309 
00310 if (vm.count("recomputation_heuristic")) {
00311     std::cout << "recomputation_heuristic: " 
00312  << vm["recomputation_heuristic"].as<std::string>() << "\n";
00313         stemod_recomputation_heuristic_=vm["recomputation_heuristic"].as<std::string>();
00314         if(sched_config_!=HYBRID){
00315                 std::cout << "Specification of STEMOD Parameter needs hybrid scheduling (scheduling_method=HYBRID)" << std::endl;
00316                 return -1;
00317         }
00318 
00319 } else {
00320     std::cout << "recomputation_heuristic was not specified, using default value...\n";
00321 }
00322 
00323 
00324 //"if (vm.count(\"$VAR\")) {
00325 //    cout << \"$VAR: \" 
00326 // << vm[\"$VAR\"].as<std::string>() << \"\n\";
00327 //      std::string s=vm[\"$VAR\"].as<std::string>();
00328 
00329 //      
00330 //} else {
00331 //    cout << \"$VAR was not specified, using default value...\n\";
00332 //}"
00333 
00334         rng_.seed(RANDOM_SEED_);
00335         srand(RANDOM_SEED_);
00336 
00337         //
00338         uint64_t estimated_ram_usage_in_byte=(MAX_DATASET_SIZE_IN_MB_*1024*1024*uint64_t(NUMBER_OF_DATASETS_+1))/2; //MAX_DATASET_SIZE_IN_MB_*NUMBER_OF_DATASETS_
00339 /*      unsigned int tic=MAX_DATASET_SIZE_IN_MB_*1024*1024/NUMBER_OF_DATASETS_;
00340         std::cout << "tic size: " << tic << std::endl;
00341                 for(unsigned int i=0;i<NUMBER_OF_DATASETS_;i++){
00342                         std::cout << "Size: " << MAX_DATASET_SIZE_IN_MB_*1024*1024-i*tic << std::endl;
00343                         estimated_ram_usage_in_byte+=MAX_DATASET_SIZE_IN_MB_*1024*1024-i*tic;
00344                 }*/
00345                 
00346         std::cout << "Generating Data sets..." << std::endl;
00347         std::cout << "Estimated RAM usage: " << estimated_ram_usage_in_byte/(1024*1024) << "MB" << std::endl;
00348         if( (estimated_ram_usage_in_byte/(1024*1024)) >1024*3.7 && util::getArchitecture()==Architecture_32Bit){
00349                 std::cout << "Memory for Datasets to generate exceeds 32 bit adress space! (" << estimated_ram_usage_in_byte/(1024*1024) << "MB)" 
00350                                          << std::endl << "Exiting..." << std::endl;
00351                 std::exit(-1); 
00352         }
00353         //generate_random_datasets expects data size in number of integer elements, while MAX_DATASET_SIZE_IN_MB_ specifies data size in Mega Bytes
00354         datasets=generate_random_datasets( (MAX_DATASET_SIZE_IN_MB_*1024*1024), NUMBER_OF_DATASETS_);
00355 
00356         std::vector<unsigned int> query_indeces(NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_);
00357 
00358         boost::uniform_int<> six(0,NUMBER_OF_DATASETS_-1); //choose data sets for sorting equally distributed
00359         //generate queries
00360         for(unsigned int i=0;i<NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_;++i){
00361                 query_indeces[i]=six(rng_);
00362         }       
00363         //std::generate(query_indeces.begin(), query_indeces.end(), Random_Number_Generator(NUMBER_OF_DATASETS_));
00364         //std::copy(query_indeces.begin(), query_indeces.end(), std::ostream_iterator<unsigned int>(std::cout, "\n"));
00365 
00366         for(unsigned int i=0;i<NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_;++i){
00367                 
00368                 unsigned int index = query_indeces[i];
00369                 Type dataset = datasets[index];
00370                 TypedNodePtr op = generate_logical_operator(dataset);
00371 //              if(i==0){
00372 //                      /*! \todo hack! init of operator map only possible long after constructor call! Implement the device constraints!*/
00373 //                      operator_map_ = getPhysical_Operator_Map();
00374 //              }
00375                 assert(op!=NULL);
00376                 operator_queries_.push_back(op);
00377         }
00378 
00379         //std::copy(query_indeces.begin(), query_indeces.end(), std::ostream_iterator<unsigned int>(std::cout, "\n"));
00380 
00381         //setup STEMOD
00382         //stemod::Scheduler::instance().addAlgorithm(operation_name_,"CPU_Algorithm_serial","Least Squares 1D","Periodic Recomputation");
00383         //stemod::Scheduler::instance().addAlgorithm(operation_name_,cpu_algorithm_name_, stemod::CPU, "Least Squares 1D", "Periodic Recomputation");
00384         //stemod::Scheduler::instance().addAlgorithm(operation_name_,gpu_algorithm_name_, stemod::GPU, "Least Squares 1D", "Periodic Recomputation");
00385 
00386         std::cout << "Setting Optimization Criterion '" << stemod_optimization_criterion_ << "'...";
00387         if(!hype::Scheduler::instance().setOptimizationCriterion(operation_name_,stemod_optimization_criterion_)){ 
00388                 std::cout << "Error: Could not set '" << stemod_optimization_criterion_ << "' as Optimization Criterion!" << std::endl;         return -1;}
00389         else std::cout << "Success..." << std::endl;
00390         //if(!scheduler.setOptimizationCriterion("MERGE","Throughput")) std::cout << "Error" << std::endl;
00391 
00392         if(!hype::Scheduler::instance().setStatisticalMethod(cpu_algorithm_name_,stemod_statistical_method_)){ 
00393                 std::cout << "Error" << std::endl; return -1;
00394         } else std::cout << "Success..." << std::endl;
00395         if(!hype::Scheduler::instance().setStatisticalMethod(gpu_algorithm_name_,stemod_statistical_method_)){ 
00396                 std::cout << "Error" << std::endl; return -1;
00397         } else std::cout << "Success..." << std::endl;
00398 
00399         if(!hype::Scheduler::instance().setRecomputationHeuristic(cpu_algorithm_name_,stemod_recomputation_heuristic_)){ 
00400                 std::cout << "Error" << std::endl; return -1;
00401         }       else std::cout << "Success..." << std::endl;
00402         if(!hype::Scheduler::instance().setRecomputationHeuristic(gpu_algorithm_name_,stemod_recomputation_heuristic_)){ 
00403                 std::cout << "Error" << std::endl; return -1;
00404         } else std::cout << "Success..." << std::endl;
00405 
00406         cpu.start();
00407         gpu.start();
00408         return 0;
00409         }
00410 
00411 
00412         int run(){
00413 
00414         //boost::this_thread::sleep( boost::posix_time::seconds(30) );
00415 
00416         std::cout << "Starting Benchmark..." << std::endl;
00417 
00418         uint64_t begin_benchmark_timestamp = getTimestamp();
00419         uint64_t end_training_timestamp=0;
00420 
00421         for(unsigned int i=0;i<NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_;i++){
00422                 /*
00423                 unsigned int index = query_indeces[i];
00424                 VecPtr dataset = datasets[index];
00425         
00426                 assert(NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_==query_indeces.size());
00427                 assert(index<NUMBER_OF_DATASETS_); //NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_);   
00428         
00429                 stemod::Tuple t;
00430                 t.push_back(dataset->size());
00431                 //stemod::SchedulingDecision sched_dec_local("",stemod::core::EstimatedTime(0),t);
00432                 */
00433                 
00434                 TypedNodePtr current_operator = operator_queries_[i];
00435                 TypedNodePtr scan=boost::static_pointer_cast<typename TypedNodePtr::element_type>(current_operator->getLeft());
00436         
00437                 //std::cout << "RUN: " << i << "/" << NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_ << std::endl;
00438                 
00439                 if(sched_config_==HYBRID){ //CPU_ONLY,GPU_ONLY,HYBRID)
00440                         //cout << "scheduling operator " << i << std::endl;
00441                         const unsigned int number_of_training_operations = (hype::core::Runtime_Configuration::instance().getTrainingLength()*2)+1; //*number of algortihms per operation (2)
00442                         if(number_of_training_operations==i){
00443                                 if(!hype::core::quiet)
00444                                         std::cout << "waiting for training to complete" << std::endl;
00445                                         //wait until training operations finished
00446                                         while(!cpu.isIdle() || !gpu.isIdle()){
00447                                                 //std::cout << "stat: cpu " << !cpu.isIdle() << " gpu " << !gpu.isIdle() << std::endl; 
00448                                                 boost::this_thread::sleep(boost::posix_time::microseconds(20));
00449                                         }       
00450                                         end_training_timestamp = getTimestamp();
00451                                         //cout << "stat: cpu " << !cpu.isIdle() << " gpu " << !gpu.isIdle() << std::endl; 
00452                                 if(!hype::core::quiet)
00453                                         std::cout << "training completed! Time: " << end_training_timestamp-begin_benchmark_timestamp << "ns (" 
00454                                                   << double(end_training_timestamp-begin_benchmark_timestamp)/(1000*1000*1000) <<"s)" << std::endl;
00455                         } 
00456                 
00457                         TypedOperatorPtr phy_scan = scan->getOptimalOperator(TypedOperatorPtr(),TypedOperatorPtr());
00458                         TypedOperatorPtr phy_op = current_operator->getOptimalOperator(phy_scan,TypedOperatorPtr());
00459 
00460                         if(phy_op->getDeviceSpecification().getDeviceType()==CPU){
00461                                 cpu.addOperator(phy_op);
00462                         }else if(phy_op->getDeviceSpecification().getDeviceType()==GPU){
00463                                 gpu.addOperator(phy_op);
00464                         }
00465 
00466                 }else if(sched_config_==CPU_ONLY){
00467                         //CPU_Sort_Parallel(dataset);
00468                         
00469                         TypedOperatorPtr phy_scan = scan->getOptimalOperator(TypedOperatorPtr(),TypedOperatorPtr());
00470                         TypedOperatorPtr phy_op = current_operator->getOptimalOperator(phy_scan,TypedOperatorPtr(),hype::CPU_ONLY);
00471                         assert(phy_op->getDeviceSpecification().getDeviceType()==CPU);
00472                         cpu.addOperator(phy_op);
00473                         /*
00474                         Physical_Operator_Map_Ptr operator_map_= current_operator->getPhysical_Operator_Map();
00475                         
00476                         Create_Typed_Operator_Function f=(*operator_map_)[cpu_algorithm_name_];
00477                         
00478                         f(current_operator,);
00479                         */
00480                         
00481                         //Physical_Operator_Map_Ptr map=current_operator->getPhysical_Operator_Map();
00482                         //TypedOperatorPtr phy_op=map[cpu_algorithm_name_];
00483                         //cpu.addOperator(phy_op);
00484                 
00485                         //std::cout << "Assigning Operator to CPU... " << std::endl;
00486                         //cpu.addOperator( boost::shared_ptr<CPU_Parallel_Sort_Operator>( new CPU_Parallel_Sort_Operator(sched_dec_local, dataset) ) );
00487 
00488                 }else if(sched_config_==GPU_ONLY){
00489                         //GPU_Sort(dataset);
00490                         
00491                         TypedOperatorPtr phy_scan = scan->getOptimalOperator(TypedOperatorPtr(),TypedOperatorPtr());
00492                         TypedOperatorPtr phy_op = current_operator->getOptimalOperator(phy_scan,TypedOperatorPtr(),hype::GPU_ONLY);
00493                         assert(phy_op->getDeviceSpecification().getDeviceType()==GPU);
00494                         gpu.addOperator(phy_op);
00495                         
00496                         //Physical_Operator_Map_Ptr map=current_operator->getPhysical_Operator_Map();
00497                         //TypedOperatorPtr phy_op=map[gpu_algorithm_name_];
00498                         //gpu.addOperator(phy_op);
00499                         
00500                         //std::cout << "Assigning Operator to GPU... " << std::endl;
00501                         //gpu.addOperator( boost::shared_ptr<GPU_Sort_Operator>( new GPU_Sort_Operator(sched_dec_local, dataset) ) );
00502                 }
00503 
00504 
00505                         /*
00506                         stemod::SchedulingDecision sched_dec = stemod::Scheduler::instance().getOptimalAlgorithmName(operation_name_,t);
00507                         
00508                 if(sched_dec.getNameofChoosenAlgorithm()=="CPU_Algorithm_serial"){
00509                         cpu.addOperator( boost::shared_ptr<CPU_Serial_Sort_Operator>( new CPU_Serial_Sort_Operator(sched_dec, dataset) ) );
00510 //                      stemod::AlgorithmMeasurement alg_measure(sched_dec);
00511 //                      CPU_Sort(dataset);
00512 //                      alg_measure.afterAlgorithmExecution(); 
00513                 }else if(sched_dec.getNameofChoosenAlgorithm()==cpu_algorithm_name_){
00514                         cpu.addOperator( boost::shared_ptr<CPU_Parallel_Sort_Operator>( new CPU_Parallel_Sort_Operator(sched_dec, dataset) ) );
00515 //                      stemod::AlgorithmMeasurement alg_measure(sched_dec);
00516 //                      CPU_Sort_Parallel(dataset);
00517 //                      alg_measure.afterAlgorithmExecution(); 
00518                 }else if(sched_dec.getNameofChoosenAlgorithm()==gpu_algorithm_name_){
00519                         gpu.addOperator( boost::shared_ptr<GPU_Sort_Operator>( new GPU_Sort_Operator(sched_dec, dataset) ) );
00520 //                      stemod::AlgorithmMeasurement alg_measure(sched_dec);
00521 //                      GPU_Sort(dataset);
00522 //                      alg_measure.afterAlgorithmExecution(); 
00523                 }
00524 
00525                 }else if(sched_config_==CPU_ONLY){
00526                         CPU_Sort_Parallel(dataset);
00527                         //std::cout << "Assigning Operator to CPU... " << std::endl;
00528                         //cpu.addOperator( boost::shared_ptr<CPU_Parallel_Sort_Operator>( new CPU_Parallel_Sort_Operator(sched_dec_local, dataset) ) );
00529                 }else if(sched_config_==GPU_ONLY){
00530                         GPU_Sort(dataset);
00531                         //std::cout << "Assigning Operator to GPU... " << std::endl;
00532                         //gpu.addOperator( boost::shared_ptr<GPU_Sort_Operator>( new GPU_Sort_Operator(sched_dec_local, dataset) ) );
00533                 }*/
00534 
00535         }
00536 
00537 //      boost::this_thread::sleep( boost::posix_time::seconds(3) );
00538 
00539 //      cpu.stop();
00540 //      gpu.stop();
00541 
00542         while(!cpu.isIdle() || !gpu.isIdle()){
00543                 boost::this_thread::sleep(boost::posix_time::microseconds(20));
00544         }
00545         uint64_t end_benchmark_timestamp = getTimestamp();
00546         std::cout << "stat: cpu " << !cpu.isIdle() << " gpu " << !gpu.isIdle() << std::endl; 
00547         std::cout << "[Main Thread] Processing Devices finished..." << std::endl;
00548 
00549         cpu.stop();
00550         gpu.stop();
00551 
00552 
00553         //if one of the following assertiosn are not fulfilled, then abort, because results are rubbish
00554         assert(end_benchmark_timestamp>=begin_benchmark_timestamp);
00555         double time_for_training_phase=0;
00556         double relative_error_cpu_parallel_algorithm = 0;
00557         double relative_error_gpu_algorithm     = 0;
00558 
00559         if(sched_config_==HYBRID){ //a training phase only exists when the decision model is used
00560                 assert(end_training_timestamp>=begin_benchmark_timestamp);
00561                 assert(end_benchmark_timestamp>=end_training_timestamp);
00562                 time_for_training_phase=end_training_timestamp-begin_benchmark_timestamp;
00563                 relative_error_cpu_parallel_algorithm = hype::Report::instance().getRelativeEstimationError(cpu_algorithm_name_);
00564                 relative_error_gpu_algorithm = hype::Report::instance().getRelativeEstimationError(gpu_algorithm_name_);
00565         }
00566 
00567         std::cout << "Time for Training: " << time_for_training_phase << "ns (" 
00568                                                   << double(time_for_training_phase)/(1000*1000*1000) <<"s)" << std::endl;
00569         
00570         std::cout << "Time for Workload: " <<  end_benchmark_timestamp-begin_benchmark_timestamp << "ns (" 
00571                   << double(end_benchmark_timestamp-begin_benchmark_timestamp)/(1000*1000*1000) << "s)" << std::endl;
00572 
00573 
00574 
00575         double total_time_cpu=cpu.getTotalProcessingTime();
00576         double total_time_gpu=gpu.getTotalProcessingTime();
00577         double total_processing_time_forall_devices=total_time_cpu + total_time_gpu;
00578 
00579         unsigned int total_dataset_size_in_bytes = 0;
00580         
00581         for(unsigned int i=0;i<datasets.size();i++){
00582                 total_dataset_size_in_bytes += datasets[i]->getSizeinBytes(); //*sizeof(ElementType);
00583                 //std::cout << "error: missing implementation for setting total_dataset_size_in_bytes" << std::endl;
00584                 //std::exit(0);
00585         }
00586         
00587         double percentaged_execution_time_on_cpu=0;
00588         double percentaged_execution_time_on_gpu=0;
00589 
00590         if(total_processing_time_forall_devices>0){
00591                 percentaged_execution_time_on_cpu = total_time_cpu/total_processing_time_forall_devices;
00592                 percentaged_execution_time_on_gpu = total_time_gpu/total_processing_time_forall_devices;
00593         }
00594 
00595 
00596 
00597         std::cout << "Time for CPU: " <<  total_time_cpu  << "ns \tTime for GPU: " << total_time_gpu << "ns" << std::endl 
00598                   << "CPU Utilization: " <<  percentaged_execution_time_on_cpu << std::endl
00599                   << "GPU Utilization: " <<  percentaged_execution_time_on_gpu << std::endl;
00600 
00601         std::cout << "Relative Error CPU_Algorithm_parallel: " << relative_error_cpu_parallel_algorithm << std::endl;
00602         std::cout << "Relative Error GPU_Algorithm: " << relative_error_gpu_algorithm    << std::endl;
00603         
00604         std::cout << "Total Size of Datasets: " << total_dataset_size_in_bytes << " Byte (" << total_dataset_size_in_bytes/(1024*1024) << "MB)" << std::endl;
00605 
00606 
00607 
00608  std::cout << MAX_DATASET_SIZE_IN_MB_ << "\t"
00609                 << NUMBER_OF_DATASETS_ << "\t"
00610                 << NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_ << "\t"
00611                 << sched_config_ << "\t"
00612                 << total_dataset_size_in_bytes << "\t"
00613                 << RANDOM_SEED_ << "\t"
00614                 << stemod_optimization_criterion_ << "\t"
00615                 << stemod_statistical_method_ << "\t"
00616                 << stemod_recomputation_heuristic_ << "\t"
00617                 << hype::core::Runtime_Configuration::instance().getMaximalReadyQueueLength() << "\t"
00618                 << hype::core::Runtime_Configuration::instance().getHistoryLength() << "\t"
00619                 << hype::core::Runtime_Configuration::instance().getRecomputationPeriod() << "\t"
00620                 << hype::core::Runtime_Configuration::instance().getTrainingLength() << "\t"
00621                 << hype::core::Runtime_Configuration::instance().getOutlinerThreshold()<< "\t"
00622                 << hype::core::Runtime_Configuration::instance().getMaximalSlowdownOfNonOptimalAlgorithm() << "\t"              
00623                 << end_benchmark_timestamp-begin_benchmark_timestamp << "\t"
00624                 << time_for_training_phase << "\t"
00625                 << total_time_cpu << "\t"
00626                 << total_time_gpu << "\t"
00627                 << percentaged_execution_time_on_cpu << "\t"
00628                 << percentaged_execution_time_on_gpu << "\t"
00629                 << relative_error_cpu_parallel_algorithm << "\t"
00630                 << relative_error_gpu_algorithm         
00631                 << std::endl;
00632 
00633  std::fstream file("benchmark_results.log",std::ios_base::out | std::ios_base::app);
00634 
00635         file.seekg(0, std::ios::end); // put the "cursor" at the end of the file
00636         unsigned int file_length = file.tellg(); // find the position of the cursor
00637 
00638  if(file_length==0){ //if file empty, write header
00639  file << "MAX_DATASET_SIZE_IN_MB_" << "\t"
00640                 << "NUMBER_OF_DATASETS_" << "\t"
00641                 << "NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_" << "\t"
00642                 << "sched_config_" << "\t"
00643                 << "total_size_of_datasets_in_bytes" << "\t"            
00644                 << "RANDOM_SEED_" << "\t"
00645                 << "stemod_optimization_criterion_" << "\t"
00646                 << "stemod_statistical_method_" << "\t"
00647                 << "stemod_recomputation_heuristic_" << "\t"
00648                 << "stemod_maximal_ready_queue_length" << "\t"
00649                 << "stemod_history_length" << "\t"
00650                 << "stemod_recomputation_period" << "\t"
00651                 << "stemod_length_of_training_phase" << "\t"
00652                 << "stemod_outliner_threshold_in_percent" << "\t"               
00653                 << "stemod_maximal_slowdown_of_non_optimal_algorithm" << "\t"   
00654                 << "workload_execution_time_in_ns" << "\t"
00655                 << "execution_time_training_only_in_ns" << "\t"         
00656                 << "total_time_cpu"  << "\t"
00657                 << "total_time_gpu"  << "\t"
00658                 << "spent_time_on_cpu_in_percent"  << "\t"
00659                 << "spent_time_on_gpu_in_percent"  << "\t"
00660                 << "average_estimation_error_CPU_Algorithm_parallel" << "\t"
00661                 << "average_estimation_error_GPU_Algorithm"             
00662                 << std::endl;
00663         }
00664 
00665  file << MAX_DATASET_SIZE_IN_MB_ << "\t"
00666                 << NUMBER_OF_DATASETS_ << "\t"
00667                 << NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_ << "\t"
00668                 << sched_config_ << "\t"
00669                 << total_dataset_size_in_bytes << "\t"          
00670                 << RANDOM_SEED_ << "\t"
00671                 << stemod_optimization_criterion_ << "\t"
00672                 << stemod_statistical_method_ << "\t"
00673                 << stemod_recomputation_heuristic_ << "\t"
00674                 << hype::core::Runtime_Configuration::instance().getMaximalReadyQueueLength() << "\t"
00675                 << hype::core::Runtime_Configuration::instance().getHistoryLength() << "\t"
00676                 << hype::core::Runtime_Configuration::instance().getRecomputationPeriod() << "\t"
00677                 << hype::core::Runtime_Configuration::instance().getTrainingLength() << "\t"
00678                 << hype::core::Runtime_Configuration::instance().getOutlinerThreshold()<< "\t"
00679                 << hype::core::Runtime_Configuration::instance().getMaximalSlowdownOfNonOptimalAlgorithm() << "\t"      
00680                 << end_benchmark_timestamp-begin_benchmark_timestamp << "\t"
00681                 << time_for_training_phase << "\t"              
00682                 << total_time_cpu << "\t"
00683                 << total_time_gpu << "\t"
00684                 << percentaged_execution_time_on_cpu << "\t"
00685                 << percentaged_execution_time_on_gpu  << "\t"
00686                 << relative_error_cpu_parallel_algorithm << "\t"
00687                 << relative_error_gpu_algorithm                         
00688                 << std::endl;
00689 
00690  file.close();
00691 
00692  return 0;
00693 }
00694 
00695 
00696 
00697 
00698 };
00699 
00700         }; //end namespace queryprocessing
00701 }; //end namespace hype
00702 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines