Column-oriented GPU-accelerated Database Management System
CoGaDB
|
00001 00002 #pragma once 00003 00004 #include <algorithm> 00005 #include <cstdlib> 00006 #include <iostream> 00007 #include <fstream> 00008 #include <cassert> 00009 #include <vector> 00010 00011 #include <stdint.h> 00012 #include <sys/mman.h> 00013 00014 #include <hype.hpp> 00015 #include <config/configuration.hpp> 00016 #include <util/architecture.hpp> 00017 #include <query_processing/operator.hpp> 00018 #include <query_processing/processing_device.hpp> 00019 #include <query_processing/node.hpp> 00020 #include <query_processing/logical_query_plan.hpp> 00021 00022 #include <tbb/parallel_sort.h> 00023 #include <tbb/task_scheduler_init.h> 00024 00025 #include <boost/smart_ptr.hpp> 00026 #include <boost/thread.hpp> 00027 #include <boost/program_options.hpp> 00028 #include <boost/random.hpp> 00029 #include <boost/generator_iterator.hpp> 00030 #include <boost/chrono.hpp> 00031 00032 namespace hype 00033 { 00034 00035 namespace queryprocessing 00036 { 00037 00038 enum SchedulingConfiguration{CPU_ONLY,GPU_ONLY,HYBRID}; 00039 00040 template <typename Type> 00041 class Operation_Benchmark{ 00042 public: 00043 struct Random_Number_Generator{ 00044 00045 Random_Number_Generator(unsigned int max_value_size) : max_value_size_(max_value_size){} 00046 00047 unsigned int operator() (){ 00048 return (unsigned int) rand()%max_value_size_; 00049 } 00050 private: 00051 unsigned int max_value_size_; 00052 }; 00053 00054 typedef Type type; 00055 typedef typename OperatorMapper_Helper_Template<Type>::TypedOperatorPtr TypedOperatorPtr; 00056 typedef typename OperatorMapper_Helper_Template<Type>::Physical_Operator_Map Physical_Operator_Map; 00057 typedef typename OperatorMapper_Helper_Template<Type>::Physical_Operator_Map_Ptr Physical_Operator_Map_Ptr; 00058 typedef typename OperatorMapper_Helper_Template<Type>::Create_Typed_Operator_Function Create_Typed_Operator_Function; 00059 typedef typename OperatorMapper_Helper_Template<Type>::TypedNodePtr TypedNodePtr; 00060 00061 /* 00062 typedef OperatorMapper_Helper_Template<Type>::TypedOperatorPtr TypedOperatorPtr; 00063 typedef OperatorMapper_Helper_Template<Type>::Physical_Operator_Map Physical_Operator_Map; 00064 typedef OperatorMapper_Helper_Template<Type>::Physical_Operator_Map_Ptr Physical_Operator_Map_Ptr; 00065 typedef OperatorMapper_Helper_Template<Type>::Create_Typed_Operator_Function Create_Typed_Operator_Function; 00066 typedef OperatorMapper_Helper_Template<Type>::TypedNodePtr TypedNodePtr; 00067 */ 00068 00069 //typedef int ElementType; 00070 //typedef vector<ElementType> Vec; 00071 //typedef boost::shared_ptr<Vec> VecPtr; 00072 00073 00074 00075 Operation_Benchmark(const std::string& operation_name, 00076 const std::string& cpu_algorithm_name, const std::string& gpu_algorithm_name) : 00077 operation_name_(operation_name), 00078 cpu_algorithm_name_(cpu_algorithm_name), 00079 gpu_algorithm_name_(gpu_algorithm_name), 00080 MAX_DATASET_SIZE_IN_MB_(1), //(10), //MB //(10*1000*1000)/sizeof(int), //1000000, 00081 NUMBER_OF_DATASETS_(10), //3, //100, 00082 NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_(30), //3, //1000, 00083 RANDOM_SEED_(0), 00084 sched_config_(CPU_ONLY), //(HYBRID), //CPU_ONLY,GPU_ONLY,HYBRID 00085 stemod_optimization_criterion_("Response Time"), 00086 stemod_statistical_method_("Least Squares 1D"), 00087 stemod_recomputation_heuristic_("Periodic Recomputation"), 00088 //cpu( DeviceSpecification (hype::PD0,hype::CPU,hype::PD_Memory_0) ), 00089 //gpu( DeviceSpecification (hype::PD1,hype::GPU,hype::PD_Memory_1) ), 00090 cpu( hype::queryprocessing::getProcessingDevice(DeviceSpecification (hype::PD0,hype::CPU,hype::PD_Memory_0)) ), 00091 gpu( hype::queryprocessing::getProcessingDevice(DeviceSpecification (hype::PD1,hype::GPU,hype::PD_Memory_1)) ), 00092 datasets(), 00093 operator_queries_(), 00094 rng_()//, 00095 //operator_map_() 00096 { 00097 /* 00098 if(!setup(argc, argv)){ 00099 std::cout << "Benchmark Setup Failed!" << std::endl; 00100 std::exit(-1); 00101 }*/ 00102 } 00103 00104 //RandomNumberGenerator random_number_generator_; 00105 std::string operation_name_; 00106 std::string cpu_algorithm_name_; 00107 std::string gpu_algorithm_name_; 00108 00109 unsigned int MAX_DATASET_SIZE_IN_MB_; //MB //(10*1000*1000)/sizeof(int); //1000000; 00110 unsigned int NUMBER_OF_DATASETS_; //3; //100; 00111 unsigned int NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_; //3; //1000; 00112 unsigned int RANDOM_SEED_; 00113 00114 SchedulingConfiguration sched_config_; //CPU_ONLY,GPU_ONLY,HYBRID 00115 00116 std::string stemod_optimization_criterion_; 00117 std::string stemod_statistical_method_; 00118 std::string stemod_recomputation_heuristic_; 00119 00120 hype::queryprocessing::ProcessingDevice& cpu; 00121 hype::queryprocessing::ProcessingDevice& gpu; 00122 00123 std::vector<Type> datasets; 00124 std::vector<TypedNodePtr> operator_queries_; 00125 00126 boost::mt19937 rng_; // produces randomness out of thin air 00127 // see pseudo-random number generators 00128 00129 00130 boost::mt19937& getRandomNumberGenerator(){ 00131 return rng_; 00132 } 00133 00134 //Physical_Operator_Map_Ptr operator_map_; 00135 00136 uint64_t getTimestamp() 00137 { 00138 using namespace boost::chrono; 00139 00140 high_resolution_clock::time_point tp = high_resolution_clock::now(); 00141 nanoseconds dur = tp.time_since_epoch(); 00142 00143 return (uint64_t)dur.count(); 00144 } 00145 00146 00147 virtual TypedNodePtr generate_logical_operator(Type dataset) = 0; 00148 00149 //virtual vector<TypedNodePtr> createOperatorQueries() = 0; 00150 00151 virtual Type generate_dataset(unsigned int size_in_number_of_bytes) = 0; 00152 00153 //virtual destructor 00154 virtual ~Operation_Benchmark(){} 00155 00156 //Type generate_dataset(unsigned int size_in_number_of_elements){ 00157 // VecPtr data(new Vec()); 00158 // for(unsigned int i=0;i<size_in_number_of_elements;i++){ 00159 // ElementType e = (ElementType) rand(); 00160 // data->push_back(e); 00161 // } 00162 // assert(data!=NULL); 00163 // //std::cout << "created new data set: " << data.get() << " of size: " << data->size() << std::endl; 00164 // return data; 00165 //} 00166 00167 std::vector<Type> generate_random_datasets(unsigned int max_size_of_dataset_in_byte, unsigned int number_of_datasets){ 00168 std::vector<Type> datasets; 00169 //first, generate dataset of full possible size, then decrease it with each loop according to a value tic, until the last dataset size is only tic 00170 unsigned int tic=max_size_of_dataset_in_byte/number_of_datasets; 00171 for(unsigned int i=0;i<number_of_datasets;i++){ 00172 Type vec_ptr = this->generate_dataset(max_size_of_dataset_in_byte-i*tic); //(unsigned int) (rand()%max_size_in_number_of_elements) ); 00173 assert(vec_ptr!=NULL); 00174 datasets.push_back(vec_ptr); 00175 } 00176 return datasets; 00177 } 00178 00179 00180 00181 00182 int setup(int argc, char* argv[]){ 00183 00184 //we don't want the OS to swap out our data to disc that's why we lock it 00185 mlockall(MCL_CURRENT|MCL_FUTURE); 00186 00187 // tbb::task_scheduler_init init(8); 00188 00189 00190 //tbb::task_scheduler_init (2); 00191 // tbb::task_scheduler_init init(1); 00192 // 00193 00194 00195 // cout << "TBB use " << tbb::task_scheduler_init::default_num_threads() << " number of threads as a default" << std::endl; 00196 // unsigned int MAX_DATASET_SIZE_IN_MB_=10; //MB //(10*1000*1000)/sizeof(int); //1000000; 00197 // unsigned int NUMBER_OF_DATASETS_=10; //3; //100; 00198 // unsigned int NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_=100; //3; //1000; 00199 // unsigned int RANDOM_SEED_=0; 00200 // //unsigned int READY_QUEUE_LENGTH=100; 00201 00202 // SchedulingConfiguration sched_config_=HYBRID; //CPU_ONLY,GPU_ONLY,HYBRID 00203 // //SchedulingConfiguration sched_config_=GPU_ONLY; 00204 // //SchedulingConfiguration sched_config_=CPU_ONLY; 00205 00206 // std::string stemod_optimization_criterion_="Response Time"; 00207 // std::string stemod_statistical_method_="Least Squares 1D"; 00208 // std::string stemod_recomputation_heuristic_="Periodic Recomputation"; 00209 00210 // Declare the supported options. 00211 boost::program_options::options_description desc("Allowed options"); 00212 desc.add_options() 00213 ("help", "produce help message") 00214 ("number_of_datasets", boost::program_options::value<unsigned int>(), "set the number of data sets for workload") 00215 ("number_of_operations", boost::program_options::value<unsigned int>(), "set the number of operations in workload") 00216 ("max_dataset_size_in_MB", boost::program_options::value<unsigned int>(), "set the maximal dataset size in MB") 00217 //("ready_queue_length", boost::program_options::value<unsigned int>(), "set the queue length of operators that may be concurrently scheduled (clients are blocked on a processing device)") 00218 ("scheduling_method", boost::program_options::value<std::string>(), "set the decision model (CPU_ONLY, GPU_ONLY, HYBRID)") 00219 ("random_seed", boost::program_options::value<unsigned int>(), "seed to use before for generating datasets and operation workload") 00220 ("optimization_criterion", boost::program_options::value<std::string>(), "set the decision models optimization_criterion for all algorithms") 00221 ("statistical_method", boost::program_options::value<std::string>(), "set the decision models statistical_method for all algorithms") 00222 ("recomputation_heuristic", boost::program_options::value<std::string>(), "set the decision models recomputation_heuristic for all algorithms") 00223 ; 00224 00225 boost::program_options::variables_map vm; 00226 boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm); 00227 boost::program_options::notify(vm); 00228 00229 if (vm.count("help")) { 00230 std::cout << desc << "\n"; 00231 return 1; 00232 } 00233 00234 if (vm.count("number_of_datasets")) { 00235 std::cout << "Number of Datasets: " 00236 << vm["number_of_datasets"].as<unsigned int>() << "\n"; 00237 NUMBER_OF_DATASETS_=vm["number_of_datasets"].as<unsigned int>(); 00238 } else { 00239 std::cout << "number_of_datasets was not specified, using default value...\n"; 00240 } 00241 00242 if (vm.count("number_of_operations")) { 00243 std::cout << "Number of Operations: " 00244 << vm["number_of_operations"].as<unsigned int>() << "\n"; 00245 NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_=vm["number_of_operations"].as<unsigned int>(); 00246 } else { 00247 std::cout << "number_of_operations was not specified, using default value...\n"; 00248 } 00249 00250 if (vm.count("max_dataset_size_in_MB")) { 00251 std::cout << "max_dataset_size_in_MB: " 00252 << vm["max_dataset_size_in_MB"].as<unsigned int>() << "MB \n"; 00253 MAX_DATASET_SIZE_IN_MB_=vm["max_dataset_size_in_MB"].as<unsigned int>(); //*1024*1024)/sizeof(int); //convert value in MB to equivalent number of integer elements 00254 } else { 00255 std::cout << "max_dataset_size_in_MB was not specified, using default value...\n"; 00256 } 00257 00258 if (vm.count("random_seed")) { 00259 std::cout << "Random Seed: " 00260 << vm["random_seed"].as<unsigned int>() << "\n"; 00261 RANDOM_SEED_=vm["random_seed"].as<unsigned int>(); 00262 } else { 00263 std::cout << "random_seed was not specified, using default value...\n"; 00264 } 00265 00266 00267 if (vm.count("scheduling_method")) { 00268 std::cout << "scheduling_method: " 00269 << vm["scheduling_method"].as<std::string>() << "\n"; 00270 std::string scheduling_method=vm["scheduling_method"].as<std::string>(); 00271 if(scheduling_method=="CPU_ONLY"){ 00272 sched_config_=CPU_ONLY; 00273 }else if(scheduling_method=="GPU_ONLY"){ 00274 sched_config_=GPU_ONLY; 00275 }else if(scheduling_method=="HYBRID"){ 00276 sched_config_=HYBRID; 00277 } 00278 00279 } else { 00280 std::cout << "scheduling_method was not specified, using default value...\n"; 00281 } 00282 00283 if (vm.count("optimization_criterion")) { 00284 std::cout << "optimization_criterion: " 00285 << vm["optimization_criterion"].as<std::string>() << "\n"; 00286 stemod_optimization_criterion_=vm["optimization_criterion"].as<std::string>(); 00287 00288 if(sched_config_!=HYBRID){ 00289 std::cout << "Specification of STEMOD Parameter needs hybrid scheduling (scheduling_method=HYBRID)" << std::endl; 00290 return -1; 00291 } 00292 00293 } else { 00294 std::cout << "optimization_criterion was not specified, using default value...\n"; 00295 } 00296 00297 if (vm.count("statistical_method")) { 00298 std::cout << "statistical_method: " 00299 << vm["statistical_method"].as<std::string>() << "\n"; 00300 stemod_statistical_method_=vm["statistical_method"].as<std::string>(); 00301 if(sched_config_!=HYBRID){ 00302 std::cout << "Specification of STEMOD Parameter needs hybrid scheduling (scheduling_method=HYBRID)" << std::endl; 00303 return -1; 00304 } 00305 00306 } else { 00307 std::cout << "statistical_method was not specified, using default value...\n"; 00308 } 00309 00310 if (vm.count("recomputation_heuristic")) { 00311 std::cout << "recomputation_heuristic: " 00312 << vm["recomputation_heuristic"].as<std::string>() << "\n"; 00313 stemod_recomputation_heuristic_=vm["recomputation_heuristic"].as<std::string>(); 00314 if(sched_config_!=HYBRID){ 00315 std::cout << "Specification of STEMOD Parameter needs hybrid scheduling (scheduling_method=HYBRID)" << std::endl; 00316 return -1; 00317 } 00318 00319 } else { 00320 std::cout << "recomputation_heuristic was not specified, using default value...\n"; 00321 } 00322 00323 00324 //"if (vm.count(\"$VAR\")) { 00325 // cout << \"$VAR: \" 00326 // << vm[\"$VAR\"].as<std::string>() << \"\n\"; 00327 // std::string s=vm[\"$VAR\"].as<std::string>(); 00328 00329 // 00330 //} else { 00331 // cout << \"$VAR was not specified, using default value...\n\"; 00332 //}" 00333 00334 rng_.seed(RANDOM_SEED_); 00335 srand(RANDOM_SEED_); 00336 00337 // 00338 uint64_t estimated_ram_usage_in_byte=(MAX_DATASET_SIZE_IN_MB_*1024*1024*uint64_t(NUMBER_OF_DATASETS_+1))/2; //MAX_DATASET_SIZE_IN_MB_*NUMBER_OF_DATASETS_ 00339 /* unsigned int tic=MAX_DATASET_SIZE_IN_MB_*1024*1024/NUMBER_OF_DATASETS_; 00340 std::cout << "tic size: " << tic << std::endl; 00341 for(unsigned int i=0;i<NUMBER_OF_DATASETS_;i++){ 00342 std::cout << "Size: " << MAX_DATASET_SIZE_IN_MB_*1024*1024-i*tic << std::endl; 00343 estimated_ram_usage_in_byte+=MAX_DATASET_SIZE_IN_MB_*1024*1024-i*tic; 00344 }*/ 00345 00346 std::cout << "Generating Data sets..." << std::endl; 00347 std::cout << "Estimated RAM usage: " << estimated_ram_usage_in_byte/(1024*1024) << "MB" << std::endl; 00348 if( (estimated_ram_usage_in_byte/(1024*1024)) >1024*3.7 && util::getArchitecture()==Architecture_32Bit){ 00349 std::cout << "Memory for Datasets to generate exceeds 32 bit adress space! (" << estimated_ram_usage_in_byte/(1024*1024) << "MB)" 00350 << std::endl << "Exiting..." << std::endl; 00351 std::exit(-1); 00352 } 00353 //generate_random_datasets expects data size in number of integer elements, while MAX_DATASET_SIZE_IN_MB_ specifies data size in Mega Bytes 00354 datasets=generate_random_datasets( (MAX_DATASET_SIZE_IN_MB_*1024*1024), NUMBER_OF_DATASETS_); 00355 00356 std::vector<unsigned int> query_indeces(NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_); 00357 00358 boost::uniform_int<> six(0,NUMBER_OF_DATASETS_-1); //choose data sets for sorting equally distributed 00359 //generate queries 00360 for(unsigned int i=0;i<NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_;++i){ 00361 query_indeces[i]=six(rng_); 00362 } 00363 //std::generate(query_indeces.begin(), query_indeces.end(), Random_Number_Generator(NUMBER_OF_DATASETS_)); 00364 //std::copy(query_indeces.begin(), query_indeces.end(), std::ostream_iterator<unsigned int>(std::cout, "\n")); 00365 00366 for(unsigned int i=0;i<NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_;++i){ 00367 00368 unsigned int index = query_indeces[i]; 00369 Type dataset = datasets[index]; 00370 TypedNodePtr op = generate_logical_operator(dataset); 00371 // if(i==0){ 00372 // /*! \todo hack! init of operator map only possible long after constructor call! Implement the device constraints!*/ 00373 // operator_map_ = getPhysical_Operator_Map(); 00374 // } 00375 assert(op!=NULL); 00376 operator_queries_.push_back(op); 00377 } 00378 00379 //std::copy(query_indeces.begin(), query_indeces.end(), std::ostream_iterator<unsigned int>(std::cout, "\n")); 00380 00381 //setup STEMOD 00382 //stemod::Scheduler::instance().addAlgorithm(operation_name_,"CPU_Algorithm_serial","Least Squares 1D","Periodic Recomputation"); 00383 //stemod::Scheduler::instance().addAlgorithm(operation_name_,cpu_algorithm_name_, stemod::CPU, "Least Squares 1D", "Periodic Recomputation"); 00384 //stemod::Scheduler::instance().addAlgorithm(operation_name_,gpu_algorithm_name_, stemod::GPU, "Least Squares 1D", "Periodic Recomputation"); 00385 00386 std::cout << "Setting Optimization Criterion '" << stemod_optimization_criterion_ << "'..."; 00387 if(!hype::Scheduler::instance().setOptimizationCriterion(operation_name_,stemod_optimization_criterion_)){ 00388 std::cout << "Error: Could not set '" << stemod_optimization_criterion_ << "' as Optimization Criterion!" << std::endl; return -1;} 00389 else std::cout << "Success..." << std::endl; 00390 //if(!scheduler.setOptimizationCriterion("MERGE","Throughput")) std::cout << "Error" << std::endl; 00391 00392 if(!hype::Scheduler::instance().setStatisticalMethod(cpu_algorithm_name_,stemod_statistical_method_)){ 00393 std::cout << "Error" << std::endl; return -1; 00394 } else std::cout << "Success..." << std::endl; 00395 if(!hype::Scheduler::instance().setStatisticalMethod(gpu_algorithm_name_,stemod_statistical_method_)){ 00396 std::cout << "Error" << std::endl; return -1; 00397 } else std::cout << "Success..." << std::endl; 00398 00399 if(!hype::Scheduler::instance().setRecomputationHeuristic(cpu_algorithm_name_,stemod_recomputation_heuristic_)){ 00400 std::cout << "Error" << std::endl; return -1; 00401 } else std::cout << "Success..." << std::endl; 00402 if(!hype::Scheduler::instance().setRecomputationHeuristic(gpu_algorithm_name_,stemod_recomputation_heuristic_)){ 00403 std::cout << "Error" << std::endl; return -1; 00404 } else std::cout << "Success..." << std::endl; 00405 00406 cpu.start(); 00407 gpu.start(); 00408 return 0; 00409 } 00410 00411 00412 int run(){ 00413 00414 //boost::this_thread::sleep( boost::posix_time::seconds(30) ); 00415 00416 std::cout << "Starting Benchmark..." << std::endl; 00417 00418 uint64_t begin_benchmark_timestamp = getTimestamp(); 00419 uint64_t end_training_timestamp=0; 00420 00421 for(unsigned int i=0;i<NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_;i++){ 00422 /* 00423 unsigned int index = query_indeces[i]; 00424 VecPtr dataset = datasets[index]; 00425 00426 assert(NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_==query_indeces.size()); 00427 assert(index<NUMBER_OF_DATASETS_); //NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_); 00428 00429 stemod::Tuple t; 00430 t.push_back(dataset->size()); 00431 //stemod::SchedulingDecision sched_dec_local("",stemod::core::EstimatedTime(0),t); 00432 */ 00433 00434 TypedNodePtr current_operator = operator_queries_[i]; 00435 TypedNodePtr scan=boost::static_pointer_cast<typename TypedNodePtr::element_type>(current_operator->getLeft()); 00436 00437 //std::cout << "RUN: " << i << "/" << NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_ << std::endl; 00438 00439 if(sched_config_==HYBRID){ //CPU_ONLY,GPU_ONLY,HYBRID) 00440 //cout << "scheduling operator " << i << std::endl; 00441 const unsigned int number_of_training_operations = (hype::core::Runtime_Configuration::instance().getTrainingLength()*2)+1; //*number of algortihms per operation (2) 00442 if(number_of_training_operations==i){ 00443 if(!hype::core::quiet) 00444 std::cout << "waiting for training to complete" << std::endl; 00445 //wait until training operations finished 00446 while(!cpu.isIdle() || !gpu.isIdle()){ 00447 //std::cout << "stat: cpu " << !cpu.isIdle() << " gpu " << !gpu.isIdle() << std::endl; 00448 boost::this_thread::sleep(boost::posix_time::microseconds(20)); 00449 } 00450 end_training_timestamp = getTimestamp(); 00451 //cout << "stat: cpu " << !cpu.isIdle() << " gpu " << !gpu.isIdle() << std::endl; 00452 if(!hype::core::quiet) 00453 std::cout << "training completed! Time: " << end_training_timestamp-begin_benchmark_timestamp << "ns (" 00454 << double(end_training_timestamp-begin_benchmark_timestamp)/(1000*1000*1000) <<"s)" << std::endl; 00455 } 00456 00457 TypedOperatorPtr phy_scan = scan->getOptimalOperator(TypedOperatorPtr(),TypedOperatorPtr()); 00458 TypedOperatorPtr phy_op = current_operator->getOptimalOperator(phy_scan,TypedOperatorPtr()); 00459 00460 if(phy_op->getDeviceSpecification().getDeviceType()==CPU){ 00461 cpu.addOperator(phy_op); 00462 }else if(phy_op->getDeviceSpecification().getDeviceType()==GPU){ 00463 gpu.addOperator(phy_op); 00464 } 00465 00466 }else if(sched_config_==CPU_ONLY){ 00467 //CPU_Sort_Parallel(dataset); 00468 00469 TypedOperatorPtr phy_scan = scan->getOptimalOperator(TypedOperatorPtr(),TypedOperatorPtr()); 00470 TypedOperatorPtr phy_op = current_operator->getOptimalOperator(phy_scan,TypedOperatorPtr(),hype::CPU_ONLY); 00471 assert(phy_op->getDeviceSpecification().getDeviceType()==CPU); 00472 cpu.addOperator(phy_op); 00473 /* 00474 Physical_Operator_Map_Ptr operator_map_= current_operator->getPhysical_Operator_Map(); 00475 00476 Create_Typed_Operator_Function f=(*operator_map_)[cpu_algorithm_name_]; 00477 00478 f(current_operator,); 00479 */ 00480 00481 //Physical_Operator_Map_Ptr map=current_operator->getPhysical_Operator_Map(); 00482 //TypedOperatorPtr phy_op=map[cpu_algorithm_name_]; 00483 //cpu.addOperator(phy_op); 00484 00485 //std::cout << "Assigning Operator to CPU... " << std::endl; 00486 //cpu.addOperator( boost::shared_ptr<CPU_Parallel_Sort_Operator>( new CPU_Parallel_Sort_Operator(sched_dec_local, dataset) ) ); 00487 00488 }else if(sched_config_==GPU_ONLY){ 00489 //GPU_Sort(dataset); 00490 00491 TypedOperatorPtr phy_scan = scan->getOptimalOperator(TypedOperatorPtr(),TypedOperatorPtr()); 00492 TypedOperatorPtr phy_op = current_operator->getOptimalOperator(phy_scan,TypedOperatorPtr(),hype::GPU_ONLY); 00493 assert(phy_op->getDeviceSpecification().getDeviceType()==GPU); 00494 gpu.addOperator(phy_op); 00495 00496 //Physical_Operator_Map_Ptr map=current_operator->getPhysical_Operator_Map(); 00497 //TypedOperatorPtr phy_op=map[gpu_algorithm_name_]; 00498 //gpu.addOperator(phy_op); 00499 00500 //std::cout << "Assigning Operator to GPU... " << std::endl; 00501 //gpu.addOperator( boost::shared_ptr<GPU_Sort_Operator>( new GPU_Sort_Operator(sched_dec_local, dataset) ) ); 00502 } 00503 00504 00505 /* 00506 stemod::SchedulingDecision sched_dec = stemod::Scheduler::instance().getOptimalAlgorithmName(operation_name_,t); 00507 00508 if(sched_dec.getNameofChoosenAlgorithm()=="CPU_Algorithm_serial"){ 00509 cpu.addOperator( boost::shared_ptr<CPU_Serial_Sort_Operator>( new CPU_Serial_Sort_Operator(sched_dec, dataset) ) ); 00510 // stemod::AlgorithmMeasurement alg_measure(sched_dec); 00511 // CPU_Sort(dataset); 00512 // alg_measure.afterAlgorithmExecution(); 00513 }else if(sched_dec.getNameofChoosenAlgorithm()==cpu_algorithm_name_){ 00514 cpu.addOperator( boost::shared_ptr<CPU_Parallel_Sort_Operator>( new CPU_Parallel_Sort_Operator(sched_dec, dataset) ) ); 00515 // stemod::AlgorithmMeasurement alg_measure(sched_dec); 00516 // CPU_Sort_Parallel(dataset); 00517 // alg_measure.afterAlgorithmExecution(); 00518 }else if(sched_dec.getNameofChoosenAlgorithm()==gpu_algorithm_name_){ 00519 gpu.addOperator( boost::shared_ptr<GPU_Sort_Operator>( new GPU_Sort_Operator(sched_dec, dataset) ) ); 00520 // stemod::AlgorithmMeasurement alg_measure(sched_dec); 00521 // GPU_Sort(dataset); 00522 // alg_measure.afterAlgorithmExecution(); 00523 } 00524 00525 }else if(sched_config_==CPU_ONLY){ 00526 CPU_Sort_Parallel(dataset); 00527 //std::cout << "Assigning Operator to CPU... " << std::endl; 00528 //cpu.addOperator( boost::shared_ptr<CPU_Parallel_Sort_Operator>( new CPU_Parallel_Sort_Operator(sched_dec_local, dataset) ) ); 00529 }else if(sched_config_==GPU_ONLY){ 00530 GPU_Sort(dataset); 00531 //std::cout << "Assigning Operator to GPU... " << std::endl; 00532 //gpu.addOperator( boost::shared_ptr<GPU_Sort_Operator>( new GPU_Sort_Operator(sched_dec_local, dataset) ) ); 00533 }*/ 00534 00535 } 00536 00537 // boost::this_thread::sleep( boost::posix_time::seconds(3) ); 00538 00539 // cpu.stop(); 00540 // gpu.stop(); 00541 00542 while(!cpu.isIdle() || !gpu.isIdle()){ 00543 boost::this_thread::sleep(boost::posix_time::microseconds(20)); 00544 } 00545 uint64_t end_benchmark_timestamp = getTimestamp(); 00546 std::cout << "stat: cpu " << !cpu.isIdle() << " gpu " << !gpu.isIdle() << std::endl; 00547 std::cout << "[Main Thread] Processing Devices finished..." << std::endl; 00548 00549 cpu.stop(); 00550 gpu.stop(); 00551 00552 00553 //if one of the following assertiosn are not fulfilled, then abort, because results are rubbish 00554 assert(end_benchmark_timestamp>=begin_benchmark_timestamp); 00555 double time_for_training_phase=0; 00556 double relative_error_cpu_parallel_algorithm = 0; 00557 double relative_error_gpu_algorithm = 0; 00558 00559 if(sched_config_==HYBRID){ //a training phase only exists when the decision model is used 00560 assert(end_training_timestamp>=begin_benchmark_timestamp); 00561 assert(end_benchmark_timestamp>=end_training_timestamp); 00562 time_for_training_phase=end_training_timestamp-begin_benchmark_timestamp; 00563 relative_error_cpu_parallel_algorithm = hype::Report::instance().getRelativeEstimationError(cpu_algorithm_name_); 00564 relative_error_gpu_algorithm = hype::Report::instance().getRelativeEstimationError(gpu_algorithm_name_); 00565 } 00566 00567 std::cout << "Time for Training: " << time_for_training_phase << "ns (" 00568 << double(time_for_training_phase)/(1000*1000*1000) <<"s)" << std::endl; 00569 00570 std::cout << "Time for Workload: " << end_benchmark_timestamp-begin_benchmark_timestamp << "ns (" 00571 << double(end_benchmark_timestamp-begin_benchmark_timestamp)/(1000*1000*1000) << "s)" << std::endl; 00572 00573 00574 00575 double total_time_cpu=cpu.getTotalProcessingTime(); 00576 double total_time_gpu=gpu.getTotalProcessingTime(); 00577 double total_processing_time_forall_devices=total_time_cpu + total_time_gpu; 00578 00579 unsigned int total_dataset_size_in_bytes = 0; 00580 00581 for(unsigned int i=0;i<datasets.size();i++){ 00582 total_dataset_size_in_bytes += datasets[i]->getSizeinBytes(); //*sizeof(ElementType); 00583 //std::cout << "error: missing implementation for setting total_dataset_size_in_bytes" << std::endl; 00584 //std::exit(0); 00585 } 00586 00587 double percentaged_execution_time_on_cpu=0; 00588 double percentaged_execution_time_on_gpu=0; 00589 00590 if(total_processing_time_forall_devices>0){ 00591 percentaged_execution_time_on_cpu = total_time_cpu/total_processing_time_forall_devices; 00592 percentaged_execution_time_on_gpu = total_time_gpu/total_processing_time_forall_devices; 00593 } 00594 00595 00596 00597 std::cout << "Time for CPU: " << total_time_cpu << "ns \tTime for GPU: " << total_time_gpu << "ns" << std::endl 00598 << "CPU Utilization: " << percentaged_execution_time_on_cpu << std::endl 00599 << "GPU Utilization: " << percentaged_execution_time_on_gpu << std::endl; 00600 00601 std::cout << "Relative Error CPU_Algorithm_parallel: " << relative_error_cpu_parallel_algorithm << std::endl; 00602 std::cout << "Relative Error GPU_Algorithm: " << relative_error_gpu_algorithm << std::endl; 00603 00604 std::cout << "Total Size of Datasets: " << total_dataset_size_in_bytes << " Byte (" << total_dataset_size_in_bytes/(1024*1024) << "MB)" << std::endl; 00605 00606 00607 00608 std::cout << MAX_DATASET_SIZE_IN_MB_ << "\t" 00609 << NUMBER_OF_DATASETS_ << "\t" 00610 << NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_ << "\t" 00611 << sched_config_ << "\t" 00612 << total_dataset_size_in_bytes << "\t" 00613 << RANDOM_SEED_ << "\t" 00614 << stemod_optimization_criterion_ << "\t" 00615 << stemod_statistical_method_ << "\t" 00616 << stemod_recomputation_heuristic_ << "\t" 00617 << hype::core::Runtime_Configuration::instance().getMaximalReadyQueueLength() << "\t" 00618 << hype::core::Runtime_Configuration::instance().getHistoryLength() << "\t" 00619 << hype::core::Runtime_Configuration::instance().getRecomputationPeriod() << "\t" 00620 << hype::core::Runtime_Configuration::instance().getTrainingLength() << "\t" 00621 << hype::core::Runtime_Configuration::instance().getOutlinerThreshold()<< "\t" 00622 << hype::core::Runtime_Configuration::instance().getMaximalSlowdownOfNonOptimalAlgorithm() << "\t" 00623 << end_benchmark_timestamp-begin_benchmark_timestamp << "\t" 00624 << time_for_training_phase << "\t" 00625 << total_time_cpu << "\t" 00626 << total_time_gpu << "\t" 00627 << percentaged_execution_time_on_cpu << "\t" 00628 << percentaged_execution_time_on_gpu << "\t" 00629 << relative_error_cpu_parallel_algorithm << "\t" 00630 << relative_error_gpu_algorithm 00631 << std::endl; 00632 00633 std::fstream file("benchmark_results.log",std::ios_base::out | std::ios_base::app); 00634 00635 file.seekg(0, std::ios::end); // put the "cursor" at the end of the file 00636 unsigned int file_length = file.tellg(); // find the position of the cursor 00637 00638 if(file_length==0){ //if file empty, write header 00639 file << "MAX_DATASET_SIZE_IN_MB_" << "\t" 00640 << "NUMBER_OF_DATASETS_" << "\t" 00641 << "NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_" << "\t" 00642 << "sched_config_" << "\t" 00643 << "total_size_of_datasets_in_bytes" << "\t" 00644 << "RANDOM_SEED_" << "\t" 00645 << "stemod_optimization_criterion_" << "\t" 00646 << "stemod_statistical_method_" << "\t" 00647 << "stemod_recomputation_heuristic_" << "\t" 00648 << "stemod_maximal_ready_queue_length" << "\t" 00649 << "stemod_history_length" << "\t" 00650 << "stemod_recomputation_period" << "\t" 00651 << "stemod_length_of_training_phase" << "\t" 00652 << "stemod_outliner_threshold_in_percent" << "\t" 00653 << "stemod_maximal_slowdown_of_non_optimal_algorithm" << "\t" 00654 << "workload_execution_time_in_ns" << "\t" 00655 << "execution_time_training_only_in_ns" << "\t" 00656 << "total_time_cpu" << "\t" 00657 << "total_time_gpu" << "\t" 00658 << "spent_time_on_cpu_in_percent" << "\t" 00659 << "spent_time_on_gpu_in_percent" << "\t" 00660 << "average_estimation_error_CPU_Algorithm_parallel" << "\t" 00661 << "average_estimation_error_GPU_Algorithm" 00662 << std::endl; 00663 } 00664 00665 file << MAX_DATASET_SIZE_IN_MB_ << "\t" 00666 << NUMBER_OF_DATASETS_ << "\t" 00667 << NUMBER_OF_SORT_OPERATIONS_IN_WORKLOAD_ << "\t" 00668 << sched_config_ << "\t" 00669 << total_dataset_size_in_bytes << "\t" 00670 << RANDOM_SEED_ << "\t" 00671 << stemod_optimization_criterion_ << "\t" 00672 << stemod_statistical_method_ << "\t" 00673 << stemod_recomputation_heuristic_ << "\t" 00674 << hype::core::Runtime_Configuration::instance().getMaximalReadyQueueLength() << "\t" 00675 << hype::core::Runtime_Configuration::instance().getHistoryLength() << "\t" 00676 << hype::core::Runtime_Configuration::instance().getRecomputationPeriod() << "\t" 00677 << hype::core::Runtime_Configuration::instance().getTrainingLength() << "\t" 00678 << hype::core::Runtime_Configuration::instance().getOutlinerThreshold()<< "\t" 00679 << hype::core::Runtime_Configuration::instance().getMaximalSlowdownOfNonOptimalAlgorithm() << "\t" 00680 << end_benchmark_timestamp-begin_benchmark_timestamp << "\t" 00681 << time_for_training_phase << "\t" 00682 << total_time_cpu << "\t" 00683 << total_time_gpu << "\t" 00684 << percentaged_execution_time_on_cpu << "\t" 00685 << percentaged_execution_time_on_gpu << "\t" 00686 << relative_error_cpu_parallel_algorithm << "\t" 00687 << relative_error_gpu_algorithm 00688 << std::endl; 00689 00690 file.close(); 00691 00692 return 0; 00693 } 00694 00695 00696 00697 00698 }; 00699 00700 }; //end namespace queryprocessing 00701 }; //end namespace hype 00702