Column-oriented GPU-accelerated Database Management System
CoGaDB
|
00001 00002 #pragma once 00003 00004 #include <core/base_column.hpp> 00005 #include <iostream> 00006 00007 #include <utility> 00008 #include <functional> 00009 #include <algorithm> 00010 //#include <vector> 00011 00012 //#include <unordered_map> 00013 #include <boost/unordered_map.hpp> 00014 #include <boost/any.hpp> 00015 #include <boost/thread.hpp> 00016 #include <boost/bind.hpp> 00017 //TBB includes 00018 #include <tbb/parallel_sort.h> 00019 #include "tbb/parallel_scan.h" 00020 #include <tbb/task_scheduler_init.h> 00021 00022 #include <gpu/gpu_algorithms.hpp> 00023 00024 #include <util/time_measurement.hpp> 00025 #include <util/begin_ptr.hpp> 00026 00027 //#include <core/column.hpp> 00028 00029 namespace CoGaDB{ 00030 00044 template<class T> 00045 class ColumnBaseTyped : public ColumnBase{ 00046 public: 00047 typedef boost::unordered_multimap<T,TID,boost::hash<T>, std::equal_to<T> > HashTable; 00048 //typedef boost::shared_ptr<ColumnBaseTyped> ColumnPtr; 00049 /***************** constructors and destructor *****************/ 00050 ColumnBaseTyped(const std::string& name, AttributeType db_type); 00051 virtual ~ColumnBaseTyped(); 00052 00053 virtual bool insert(const boost::any& new_Value)=0; 00054 virtual bool insert(const T& new_Value) = 0; 00055 virtual bool update(TID tid, const boost::any& new_value) = 0; 00056 virtual bool update(PositionListPtr tid, const boost::any& new_value) = 0; 00057 00058 virtual bool remove(TID tid)=0; 00059 //assumes tid list is sorted ascending 00060 virtual bool remove(PositionListPtr tid)=0; 00061 virtual bool clearContent()=0; 00062 00063 virtual const boost::any get(TID tid)=0; 00064 //virtual const boost::any* const getRawData()=0; 00065 virtual void print() const throw()=0; 00066 virtual size_t size() const throw()=0; 00067 virtual unsigned int getSizeinBytes() const throw()=0; 00068 00069 virtual const ColumnPtr copy() const=0; 00070 virtual const ColumnPtr gather(PositionListPtr tid_list) = 0; 00071 /***************** relational operations on Columns which return lookup tables *****************/ 00072 virtual const PositionListPtr sort(SortOrder order); 00073 virtual const PositionListPtr selection(const boost::any& value_for_comparison, const ValueComparator comp); 00074 virtual const PositionListPtr selection(ColumnPtr, const ValueComparator comp); 00075 virtual const PositionListPtr parallel_selection(const boost::any& value_for_comparison, const ValueComparator comp, unsigned int number_of_threads); 00076 virtual const PositionListPtr lock_free_parallel_selection(const boost::any& value_for_comparison, const ValueComparator comp, unsigned int number_of_threads); 00077 //join algorithms 00078 virtual const PositionListPairPtr hash_join(ColumnPtr join_column); 00079 virtual const PositionListPairPtr parallel_hash_join(ColumnPtr join_column,unsigned int number_of_threads); 00080 static void hash_join_pruning_thread(ColumnBaseTyped<T>* join_column, HashTable* hashtable, unsigned int* join_tids_table1, unsigned int* join_tids_table2, unsigned int thread_id, unsigned int number_of_threads, unsigned int* result_size); 00081 virtual const PositionListPairPtr sort_merge_join(ColumnPtr join_column); 00082 virtual const PositionListPairPtr nested_loop_join(ColumnPtr join_column); 00083 00084 virtual bool add(const boost::any& new_Value); 00085 //vector addition between columns 00086 virtual bool add(ColumnPtr join_column); 00087 00088 virtual bool minus(const boost::any& new_Value); 00089 virtual bool minus(ColumnPtr join_column); 00090 00091 virtual bool multiply(const boost::any& new_Value); 00092 virtual bool multiply(ColumnPtr join_column); 00093 00094 virtual bool division(const boost::any& new_Value); 00095 virtual bool division(ColumnPtr join_column); 00096 00097 //template <typename U, typename BinaryOperator> 00098 //std::pair<ColumnPtr,ColumnPtr> aggregate_by_keys(ColumnBaseTyped<U>* keys, BinaryOperator binary_op) const; 00099 00100 virtual bool store(const std::string& path) = 0; 00101 virtual bool load(const std::string& path) = 0; 00102 virtual bool isMaterialized() const throw() = 0; 00103 virtual bool isCompressed() const throw() = 0; 00104 virtual const ColumnPtr materialize() throw() =0; 00105 virtual bool is_equal(ColumnPtr column); 00106 virtual int compareValuesAtIndexes(TID id1, TID id2); 00108 virtual const std::type_info& type() const throw(); 00113 virtual T& operator[](const int index) = 0; 00114 inline bool operator==(ColumnBaseTyped<T>& column); 00115 00116 }; 00117 00118 00119 template<class T> 00120 ColumnBaseTyped<T>::ColumnBaseTyped(const std::string& name, AttributeType db_type) : ColumnBase(name,db_type){ 00121 00122 } 00123 00124 template<class T> 00125 ColumnBaseTyped<T>::~ColumnBaseTyped(){ 00126 00127 } 00128 00129 template<class T> 00130 const std::type_info& ColumnBaseTyped<T>::type() const throw(){ 00131 return typeid(T); 00132 } 00133 00134 template<class T> 00135 bool ColumnBaseTyped<T>::is_equal(ColumnPtr column) 00136 { 00137 if(column->type()!=typeid(T)) { 00138 return false; 00139 //std::cout << "Fatal Error!!! Typemismatch for columns " << this->name_ << " and " << column->getName() << std::endl; 00140 //std::cout << "File: " << __FILE__ << " Line: " << __LINE__ << std::endl; 00141 //exit(-1); 00142 } 00143 if(column->size()!=this->size()) 00144 return false; 00145 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<T> > typed_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<T> >(column); //static_cast<IntColumnPtr>(column1); 00146 for(unsigned int i=0; i<this->size(); ++i) { 00147 if((*this)[i]!=(*typed_column)[i]) { 00148 return false; 00149 } 00150 } 00151 return true; 00152 } 00153 00154 template<class T> 00155 int ColumnBaseTyped<T>::compareValuesAtIndexes(TID id1, TID id2){ 00156 if ((*this)[id1] == (*this)[id2]) return 0; 00157 else if ((*this)[id1] < (*this)[id2]) return 1; 00158 else return -1; 00159 /* 00160 if(column->type()!=typeid(T)) { 00161 return false; 00162 } 00163 assert(id_column<column->size() && id_this_col < this->size()); 00164 // return false; 00165 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<T> > typed_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<T> >(column); //static_cast<IntColumnPtr>(column1); 00166 return (*this)[id_this_col]<typed_column[id_column]); */ 00167 } 00168 00169 00170 00171 template<class T> 00172 const PositionListPtr ColumnBaseTyped<T>::sort(SortOrder order){ 00173 00174 PositionListPtr ids = PositionListPtr( new PositionList()); 00175 std::vector<std::pair<T,TID> > v; 00176 00177 for(unsigned int i=0;i<this->size();i++){ 00178 v.push_back (std::pair<T,TID>((*this)[i],i) ); 00179 } 00180 00181 //TODO: change implementation, so that no copy operations are required -> use boost zip iterators! 00182 00183 if(order==ASCENDING){ 00184 //tbb::parallel_sort(v.begin(),v.end(),std::less_equal<std::pair<T,TID> >()); 00185 std::stable_sort(v.begin(),v.end(),std::less_equal<std::pair<T,TID> >()); 00186 }else if(order==DESCENDING){ 00187 //tbb::parallel_sort(v.begin(),v.end(),std::greater_equal<std::pair<T,TID> >()); 00188 std::stable_sort(v.begin(),v.end(),std::greater_equal<std::pair<T,TID> >()); 00189 }else{ 00190 std::cout << "FATAL ERROR: ColumnBaseTyped<T>::sort(): Unknown Sorting Order!" << std::endl; 00191 } 00192 00193 for(unsigned int i=0;i<v.size();i++){ 00194 ids->push_back(v[i].second); 00195 } 00196 00197 return ids; 00198 } 00199 00200 /* 00201 template<class T> 00202 struct Column_Scan_Equal : public unary_function<T,bool> { 00203 Column_Scan_Equal(std::vector<TID>& local_result_tids, const T& comparison_value) : local_result_tids_(local_result_tids), comparison_value_(comparison_value){} 00204 bool operator() (T& value) { 00205 if(value==comparison_value_){ 00206 local_result_tids_.push_back(); 00207 return true; 00208 } 00209 return false; 00210 } 00211 std::vector<TID>& local_result_tids_; 00212 T comparison_value_; 00213 };*/ 00214 00215 template<class T> 00216 void selection_thread(unsigned int thread_id, unsigned int number_of_threads, const T& value, const ValueComparator comp, ColumnBaseTyped<T>* col, PositionListPtr result_tids){ 00217 //std::cout << "Hi I'm thread" << thread_id << std::endl; 00218 if(!quiet) std::cout << "Using CPU for Selection (parallel mode)..." << std::endl; 00219 unsigned int array_size=col->size(); 00220 for(TID i=thread_id;i<array_size;i+=number_of_threads){ 00221 00222 //boost::any value = column->get(i); 00223 //val = values_[i]; 00224 00225 if(comp==EQUAL){ 00226 if(value==(*col)[i]){ 00227 //result_table->insert(this->fetchTuple(i)); 00228 //std::cout << "MATCH " << thread_id << std::endl; 00229 result_tids->push_back(i); 00230 } 00231 }else if(comp==LESSER){ 00232 if((*col)[i]<value){ 00233 //result_table->insert(this->fetchTuple(i)); 00234 //std::cout << "MATCH " << thread_id << std::endl; 00235 result_tids->push_back(i); 00236 } 00237 }else if(comp==LESSER_EQUAL){ 00238 if((*col)[i]<=value){ 00239 //result_table->insert(this->fetchTuple(i)); 00240 //std::cout << "MATCH " << thread_id << std::endl; 00241 result_tids->push_back(i); 00242 } 00243 }else if(comp==GREATER){ 00244 if((*col)[i]>value){ 00245 //std::cout << "MATCH " << thread_id << std::endl; 00246 result_tids->push_back(i); 00247 //result_table->insert(this->fetchTuple(i)); 00248 } 00249 }else if(comp==GREATER_EQUAL){ 00250 if((*col)[i]>=value){ 00251 //std::cout << "MATCH " << thread_id << std::endl; 00252 result_tids->push_back(i); 00253 //result_table->insert(this->fetchTuple(i)); 00254 } 00255 }else{ 00256 00257 } 00258 } 00259 } 00260 00261 template<class T> 00262 const PositionListPtr ColumnBaseTyped<T>::parallel_selection(const boost::any& value_for_comparison, const ValueComparator comp, unsigned int number_of_threads){ 00263 00264 PositionListPtr result_tids( new PositionList()); 00265 //unsigned int number_of_threads=4; 00266 00267 if(value_for_comparison.type()!=typeid(T)){ 00268 std::cout << "Fatal Error!!! Typemismatch for column " << name_ << std::endl; 00269 std::cout << "File: " << __FILE__ << " Line: " << __LINE__ << std::endl; 00270 exit(-1); 00271 } 00272 00273 T value = boost::any_cast<T>(value_for_comparison); 00274 00275 boost::thread_group threads; 00276 00277 std::vector<PositionListPtr> local_result_arrays; 00278 for(unsigned int i=0;i<number_of_threads;i++){ 00279 local_result_arrays.push_back(PositionListPtr(new std::vector<TID>())); 00280 threads.add_thread(new boost::thread(boost::bind(&CoGaDB::selection_thread<T>, i, number_of_threads, value, comp, this,local_result_arrays[i]))); 00281 } 00282 threads.join_all(); 00283 00284 /* 00285 unsigned int result_size=0; 00286 for(unsigned int i=0;i<number_of_threads;i++){ 00287 // std::cout << "Local result size of thread " << i << ":" << local_result_arrays[i]->size() << std::endl; 00288 result_size+=local_result_arrays[i]->size(); 00289 } 00290 result_tids = new PositionList(local_result_arrays[0]->begin(),local_result_arrays[0]->end()); //->resize(result_size); 00291 for(unsigned int i=1;i<number_of_threads;i++){ 00292 00293 std::merge (result_tids.begin(),result_tids.end(),local_result_arrays[i]->begin(),local_result_arrays[i]->end(),result_tids->begin()); 00294 } */ 00295 00296 00297 00298 for(unsigned int i=0;i<number_of_threads;i++){ 00299 // std::cout << "Local result size of thread " << i << ":" << local_result_arrays[i]->size() << std::endl; 00300 result_tids->insert(result_tids->end(),local_result_arrays[i]->begin(),local_result_arrays[i]->end()); 00301 } 00302 //std::cout << "number of result tids:" << result_tids->size() << std::endl; 00303 tbb::parallel_sort(result_tids->begin(),result_tids->end()); 00304 //exit(-1); 00305 return result_tids; 00306 } 00307 00308 typedef std::vector<int> Flags; 00309 typedef boost::shared_ptr<Flags> FlagsPtr; 00310 00311 00312 class TBB_Body_PrefixSum{ 00313 public: 00314 TBB_Body_PrefixSum (std::vector<int>* y_, const std::vector<int>* x_) : sum(0), x(x_), y(y_) {} 00315 int get_sum() const {return sum;} 00316 template<typename Tag> 00317 void operator()( const tbb::blocked_range<int>& r, Tag ) { 00318 int temp = sum; 00319 for( int i=r.begin(); i<r.end(); ++i ) { 00320 temp = temp + (*x)[i]; 00321 if(Tag::is_final_scan() ) 00322 (*y)[i] = temp; 00323 } 00324 sum = temp; 00325 } 00326 TBB_Body_PrefixSum ( TBB_Body_PrefixSum & b, tbb::split ) : x(b.x), y(b.y), sum(0) {} 00327 void reverse_join( TBB_Body_PrefixSum & a ) { sum = a.sum + sum;} 00328 void assign( TBB_Body_PrefixSum & b ) {sum = b.sum;} 00329 private: 00330 int sum; 00331 std::vector<int>* y; 00332 const std::vector<int>* x; 00333 }; 00334 00335 inline int TBB_Prefix_Sum(std::vector<int>& y, const std::vector<int>& x, unsigned int number_of_threads) { 00336 assert(y.size()==x.size()); 00337 unsigned int chunk_size=x.size()/number_of_threads; 00338 TBB_Body_PrefixSum body(&y,&x); 00339 tbb::parallel_scan( tbb::blocked_range<int>(0,x.size(),chunk_size), body ); 00340 return body.get_sum(); 00341 } 00342 00343 00344 00345 00346 template <typename T> 00347 void selection_thread_set_flag_array(ColumnBaseTyped<T>* col, unsigned int thread_id, FlagsPtr flags, unsigned int number_of_threads, const T& value_for_comparison, const ValueComparator comp){ 00348 assert(flags->size()==col->size()); 00349 if(!quiet) std::cout << "Using CPU for Selection (parallel mode)..." << std::endl; 00350 unsigned int array_size=col->size(); 00351 unsigned int chunk_size=col->size()/number_of_threads; 00352 unsigned int start_id=thread_id*chunk_size; 00353 unsigned int end_id=(thread_id*chunk_size)+chunk_size; 00354 //make sure that the last thread processes the rest of the array 00355 if(thread_id+1==number_of_threads) end_id=array_size; 00356 if(comp==EQUAL){ 00357 for(TID i=start_id;i<end_id;i++){ 00358 if(value_for_comparison==(*col)[i]){ 00359 (*flags)[i]=1; 00360 } 00361 } 00362 }else if(comp==LESSER){ 00363 for(TID i=start_id;i<end_id;i++){ 00364 if((*col)[i]<value_for_comparison){ 00365 (*flags)[i]=1; 00366 } 00367 } 00368 }else if(comp==LESSER_EQUAL){ 00369 for(TID i=start_id;i<end_id;i++){ 00370 if((*col)[i]<=value_for_comparison){ 00371 (*flags)[i]=1; 00372 } 00373 } 00374 }else if(comp==GREATER){ 00375 for(TID i=start_id;i<end_id;i++){ 00376 if((*col)[i]>value_for_comparison){ 00377 (*flags)[i]=1; 00378 } 00379 } 00380 }else if(comp==GREATER_EQUAL){ 00381 for(TID i=start_id;i<end_id;i++){ 00382 if((*col)[i]>=value_for_comparison){ 00383 (*flags)[i]=1; 00384 } 00385 } 00386 }else{ 00387 std::cerr << "FATAL Error! In CoGaDB::selection_thread_set_flag_array(): Invalid Value Comparator! " << comp << std::endl; 00388 } 00389 00390 // unsigned int array_size=col->size(); 00391 // if(comp==EQUAL){ 00392 // for(TID i=thread_id;i<array_size;i+=number_of_threads){ 00393 // if(value_for_comparison==(*col)[i]){ 00394 // (*flags)[i]=1; 00395 // } 00396 // } 00397 // }else if(comp==LESSER){ 00398 // for(TID i=thread_id;i<array_size;i+=number_of_threads){ 00399 // if((*col)[i]<value_for_comparison){ 00400 // (*flags)[i]=1; 00401 // } 00402 // } 00403 // }else if(comp==GREATER){ 00404 // for(TID i=thread_id;i<array_size;i+=number_of_threads){ 00405 // if((*col)[i]>value_for_comparison){ 00406 // (*flags)[i]=1; 00407 // } 00408 // } 00409 // }else{ 00410 // std::cerr << "FATAL Error! In CoGaDB::selection_thread_set_flag_array(): Invalid Value Comparator! " << comp << std::endl; 00411 // } 00412 00413 00414 } 00415 00416 template <typename T> 00417 void selection_thread_write_result_to_output_array(ColumnBaseTyped<T>* col, unsigned int thread_id, FlagsPtr flags, std::vector<int>* prefix_sum_array, PositionListPtr result_tids, unsigned int number_of_threads){ 00418 assert(flags->size()==col->size()); 00419 assert(flags->size()==prefix_sum_array->size()); 00420 unsigned int array_size=col->size(); 00421 unsigned int chunk_size=col->size()/number_of_threads; 00422 unsigned int start_id=thread_id*chunk_size; 00423 unsigned int end_id=(thread_id*chunk_size)+chunk_size; 00424 //make sure that the last thread processes the rest of the array 00425 if(thread_id+1==number_of_threads) end_id=array_size; 00426 for(TID i=start_id;i<end_id;i++){ 00427 if((*flags)[i]==1){ 00428 unsigned int write_id=(*prefix_sum_array)[i]-1; 00429 (*result_tids)[write_id]=i; //write matching TID to output buffer 00430 } 00431 } 00432 00433 //if(!quiet) std::cout << "Using CPU for Selection (parallel mode)..." << std::endl; 00434 // unsigned int array_size=col->size(); 00435 // for(TID i=thread_id;i<array_size;i+=number_of_threads){ 00436 // if((*flags)[i]==1){ 00437 // unsigned int write_id=(*prefix_sum_array)[i]-1; 00438 // (*result_tids)[write_id]=i; //write matching TID to output buffer 00439 // } 00440 // } 00441 00442 } 00443 00444 00445 template<class T> 00446 const PositionListPtr ColumnBaseTyped<T>::lock_free_parallel_selection(const boost::any& value_for_comparison, const ValueComparator comp, unsigned int number_of_threads){ 00447 00448 00449 //unsigned int number_of_threads=4; 00450 00451 if(value_for_comparison.type()!=typeid(T)){ 00452 std::cout << "Fatal Error!!! Typemismatch for column " << name_ << std::endl; 00453 std::cout << "File: " << __FILE__ << " Line: " << __LINE__ << std::endl; 00454 exit(-1); 00455 } 00456 00457 T value = boost::any_cast<T>(value_for_comparison); 00458 00459 boost::thread_group threads; 00460 00461 //create flag array of column size and init with zeros 00462 FlagsPtr flags(new Flags(this->size(),0)); 00463 //std::vector<PositionListPtr> local_result_arrays; 00464 for(unsigned int i=0;i<number_of_threads;i++){ 00465 threads.add_thread(new boost::thread(boost::bind(&CoGaDB::selection_thread_set_flag_array<T>, this, i, flags, number_of_threads, value, comp))); 00466 } 00467 threads.join_all(); 00468 00469 if(!quiet && verbose && debug) 00470 { 00471 std::cout << "FLAG Array:" << std::endl; 00472 for(unsigned int i=0;i<flags->size();++i){ 00473 std::cout << (*flags)[i] << std::endl; 00474 } 00475 } 00476 00477 std::vector<int> prefix_sum(this->size(),0); 00478 00479 //do prefix sum on threads 00480 TBB_Prefix_Sum(prefix_sum,*flags,number_of_threads); 00481 00482 if(!quiet && verbose && debug) 00483 { 00484 std::cout << "Prefix Sum:" << std::endl; 00485 for(unsigned int i=0;i<prefix_sum.size();++i){ 00486 std::cout << prefix_sum[i] << std::endl; 00487 } 00488 } 00489 unsigned int resul_size=prefix_sum.back(); 00490 PositionListPtr result_tids( new PositionList(resul_size)); 00491 00492 for(unsigned int i=0;i<number_of_threads;i++){ 00493 threads.add_thread(new boost::thread(boost::bind(&CoGaDB::selection_thread_write_result_to_output_array<T>, this, i, flags, &prefix_sum, result_tids, number_of_threads))); 00494 } 00495 threads.join_all(); 00496 00497 if(!quiet && verbose && debug) 00498 { 00499 std::cout << "TIDS:" << std::endl; 00500 for(unsigned int i=0;i<result_tids->size();++i){ 00501 std::cout << (*result_tids)[i] << std::endl; 00502 } 00503 } 00504 00505 return result_tids; 00506 } 00507 00508 00509 00510 template<class T> 00511 const PositionListPtr ColumnBaseTyped<T>::selection(const boost::any& value_for_comparison, const ValueComparator comp){ 00512 if(value_for_comparison.type()!=typeid(T)){ 00513 std::cout << "Fatal Error!!! Typemismatch for column " << name_ << std::endl; 00514 std::cout << "File: " << __FILE__ << " Line: " << __LINE__ << std::endl; 00515 exit(-1); 00516 } 00517 00518 T value = boost::any_cast<T>(value_for_comparison); 00519 PositionListPtr result_tids; 00520 00521 result_tids = PositionListPtr(new PositionList()); 00522 //one third rule for selections: assume a selectivity of 0.3, meaning we need roughly 0.3 times of the input to store the result 00523 //this optimizatio nshould minimize the number of reallocations during the insertion process 00524 //result_tids->reserve(0.3*this->size()); 00525 00526 //std::cout << "Scan column of size " << this->size() << std::endl; 00527 unsigned int array_size=this->size(); 00528 // //calls new internally 00529 // result_tids->resize(array_size); 00530 // //tids.reserve(array_size); 00531 // //get pointer 00532 // unsigned int* array_tids=hype::util::begin_ptr(*result_tids); 00533 // assert(array_tids!=NULL); 00534 // unsigned int pos=0; 00535 if(!quiet) std::cout << "Using CPU for Selection..." << std::endl; 00536 //unsigned int array_size = this->size(); 00537 //for(TID i=0;i<array_size;i++){ 00538 00539 //boost::any value = column->get(i); 00540 //val = values_[i]; 00541 00542 // unsigned int pos =0; 00543 // for(TID i=0;i<array_size;i++){ 00544 // result_tids[pos]=i; 00545 // pos+=(value==(*this)[i]); 00546 // } 00547 00548 if(comp==EQUAL){ 00549 for(TID i=0;i<array_size;i++){ 00550 if(value==(*this)[i]){ 00551 //array_tids[pos++]=i; 00552 result_tids->push_back(i); 00553 } 00554 } 00555 }else if(comp==LESSER){ 00556 for(TID i=0;i<array_size;i++){ 00557 if((*this)[i]<value){ 00558 //result_table->insert(this->fetchTuple(i)); 00559 result_tids->push_back(i); 00560 //array_tids[pos++]=i; 00561 } 00562 } 00563 }else if(comp==LESSER_EQUAL){ 00564 for(TID i=0;i<array_size;i++){ 00565 if((*this)[i]<=value){ 00566 result_tids->push_back(i); 00567 //result_table->insert(this->fetchTuple(i)); 00568 //array_tids[pos++]=i; 00569 } 00570 } 00571 }else if(comp==GREATER){ 00572 for(TID i=0;i<array_size;i++){ 00573 if((*this)[i]>value){ 00574 result_tids->push_back(i); 00575 //result_table->insert(this->fetchTuple(i)); 00576 //array_tids[pos++]=i; 00577 } 00578 } 00579 }else if(comp==GREATER_EQUAL){ 00580 for(TID i=0;i<array_size;i++){ 00581 if((*this)[i]>=value){ 00582 result_tids->push_back(i); 00583 //result_table->insert(this->fetchTuple(i)); 00584 //array_tids[pos++]=i; 00585 } 00586 } 00587 }else{ 00588 00589 } 00590 //} 00591 //shrink to actual result size 00592 //result_tids->resize(pos); 00593 return result_tids; 00594 } 00595 00596 template<class T> 00597 const PositionListPtr ColumnBaseTyped<T>::selection(ColumnPtr comparison_column, const ValueComparator comp){ 00598 assert(comparison_column!=NULL); 00599 if(comparison_column->type()!=typeid(T)){ 00600 std::cout << "Fatal Error!!! Typemismatch for columns " << this->name_ << " and " << comparison_column->getName() << std::endl; 00601 std::cout << "File: " << __FILE__ << " Line: " << __LINE__ << std::endl; 00602 exit(-1); 00603 } 00604 00605 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<T> > column = shared_pointer_namespace::dynamic_pointer_cast<ColumnBaseTyped<T> >(comparison_column); //static_cast<IntColumnPtr>(column1); 00606 assert(column!=NULL); 00607 00608 PositionListPtr result_tids; 00609 00610 result_tids = PositionListPtr(new PositionList()); 00611 //one third rule for selections: assume a selectivity of 0.3, meaning we need roughly 0.3 times of the input to store the result 00612 //this optimizatio nshould minimize the number of reallocations during the insertion process 00613 result_tids->reserve(this->size()); 00614 if(!quiet) std::cout << "Using CPU for Selection..." << std::endl; 00615 unsigned int array_size = this->size(); 00616 //for(TID i=0;i<array_size;i++){ 00617 00618 //boost::any value = column->get(i); 00619 //val = values_[i]; 00620 00621 if(comp==EQUAL){ 00622 for(TID i=0;i<array_size;i++){ 00623 if((*column)[i]==(*this)[i]){ 00624 result_tids->push_back(i); 00625 } 00626 } 00627 }else if(comp==LESSER){ 00628 for(TID i=0;i<array_size;i++){ 00629 if((*this)[i]<(*column)[i]){ 00630 //result_table->insert(this->fetchTuple(i)); 00631 result_tids->push_back(i); 00632 } 00633 } 00634 }else if(comp==LESSER_EQUAL){ 00635 for(TID i=0;i<array_size;i++){ 00636 if((*this)[i]<=(*column)[i]){ 00637 //result_table->insert(this->fetchTuple(i)); 00638 result_tids->push_back(i); 00639 } 00640 } 00641 }else if(comp==GREATER){ 00642 for(TID i=0;i<array_size;i++){ 00643 if((*this)[i]>(*column)[i]){ 00644 result_tids->push_back(i); 00645 //result_table->insert(this->fetchTuple(i)); 00646 } 00647 } 00648 }else if(comp==GREATER_EQUAL){ 00649 for(TID i=0;i<array_size;i++){ 00650 if((*this)[i]>=(*column)[i]){ 00651 result_tids->push_back(i); 00652 //result_table->insert(this->fetchTuple(i)); 00653 } 00654 } 00655 }else{ 00656 00657 } 00658 //} 00659 00660 return result_tids; 00661 } 00662 00663 00664 template<class T> 00665 const PositionListPairPtr ColumnBaseTyped<T>::hash_join(ColumnPtr join_column_){ 00666 00667 typedef boost::unordered_multimap<T,TID,boost::hash<T>, std::equal_to<T> > HashTable; 00668 00669 if(join_column_->type()!=typeid(T)){ 00670 std::cout << "Fatal Error!!! Typemismatch for columns " << this->name_ << " and " << join_column_->getName() << std::endl; 00671 std::cout << "File: " << __FILE__ << " Line: " << __LINE__ << std::endl; 00672 exit(-1); 00673 } 00674 00675 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<T> > join_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<T> >(join_column_); //static_cast<IntColumnPtr>(column1); 00676 00677 PositionListPairPtr join_tids( new PositionListPair()); 00678 join_tids->first = PositionListPtr( new PositionList() ); 00679 join_tids->second = PositionListPtr( new PositionList() ); 00680 00681 00682 Timestamp build_hashtable_begin = getTimestamp(); 00683 //create hash table 00684 HashTable hashtable; 00685 unsigned int hash_table_size=this->size(); 00686 unsigned int join_column_size=join_column->size(); 00687 00688 assert(join_column_->size()>=this->size()); 00689 unsigned int* join_tids_table1 = new unsigned int[join_column_size]; 00690 unsigned int* join_tids_table2 = new unsigned int[join_column_size]; 00691 unsigned int pos1 = 0; 00692 unsigned int pos2 = 0; 00693 ColumnBaseTyped<T>& join_column_ref = dynamic_cast< ColumnBaseTyped<T>& >(*join_column); 00694 00695 for(unsigned int i=0;i<hash_table_size;i++) 00696 hashtable.insert(std::pair<T,TID> ((*this)[i],i)); 00697 Timestamp build_hashtable_end = getTimestamp(); 00698 // std::cout << "Number of Buckets: " << hashtable.bucket_count() << std::endl; 00699 // for(unsigned int i=0;i< hashtable.bucket_count();i++){ 00700 // std::cout << "Size of Bucket '" << i << "': " << hashtable.bucket_size(i) << std::endl; 00701 // } 00702 00703 //probe larger relation 00704 Timestamp prune_hashtable_begin = getTimestamp(); 00705 00706 std::pair<typename HashTable::iterator, typename HashTable::iterator> range; 00707 typename HashTable::iterator it; 00708 for(unsigned int i=0;i<join_column_size;i++){ 00709 range = hashtable.equal_range(join_column_ref[i]); 00710 for(it=range.first ; it!=range.second;++it){ 00711 if(it->first==join_column_ref[i]){ //(*join_column)[i]){ 00712 join_tids_table1[pos1++]=it->second; 00713 join_tids_table2[pos2++]=i; 00714 00715 //pos2=++pos1; 00716 // join_tids_table1[pos1++]=it->second; 00717 // join_tids_table2[pos2++]=i; 00718 // join_tids->first->push_back(it->second); 00719 // join_tids->second->push_back(i); 00720 00721 //cout << "match! " << it->second << ", " << i << " " << it->first << endl; 00722 } 00723 } 00724 } 00725 00726 // for (unsigned int i = 0; i < join_column_size; i++) { 00727 // std::pair<typename HashTable::iterator, typename HashTable::iterator> range = hashtable.equal_range((*join_column)[i]); 00728 // for (typename HashTable::iterator it = range.first; it != range.second; ++it) { 00729 // if (it->first == join_column_ref[i]) { //(*join_column)[i]){ 00730 // join_tids_table1[pos1++] = it->second; 00731 // join_tids_table2[pos2++] = i; 00732 // // join_tids->first->push_back(it->second); 00733 // // join_tids->second->push_back(i); 00734 // 00735 // //cout << "match! " << it->second << ", " << i << " " << it->first << endl; 00736 // } 00737 // } 00738 // } 00739 //copy result in PositionList (vector) 00740 join_tids->first->insert(join_tids->first->end(),join_tids_table1,join_tids_table1+pos1); 00741 join_tids->second->insert(join_tids->second->end(),join_tids_table2,join_tids_table2+pos2); 00742 00743 delete join_tids_table1; 00744 delete join_tids_table2; 00745 Timestamp prune_hashtable_end = getTimestamp(); 00746 00747 if(!quiet && verbose) 00748 std::cout << "Hash Join: Build Phase: " << double(build_hashtable_end-build_hashtable_begin)/(1000*1000) << "ms" 00749 << "Pruning Phase: " << double(prune_hashtable_end-prune_hashtable_begin)/(1000*1000) << "ms" << std::endl; 00750 00751 return join_tids; 00752 } 00753 00754 00755 00756 00757 //template <typename T> template< typename U, typename BinaryOperator> 00758 //std::pair<ColumnPtr,ColumnPtr> ColumnBaseTyped<T>::aggregate_by_keys(ColumnBaseTyped<U>* keys, BinaryOperator binary_op) const{ 00759 00760 // /* 00761 // if(keys->type()!=typeid(T)){ 00762 // std::cout << "Fatal Error!!! Typemismatch for columns " << this->name_ << " and " << keys->getName() << std::endl; 00763 // std::cout << "File: " << __FILE__ << " Line: " << __LINE__ << std::endl; 00764 // exit(-1); 00765 // }*/ 00766 // 00767 // assert(keys->size()==this->size()); 00768 00769 // Column<T>* new_keys = new Column<T>(this->name_,this->getType()); 00770 // Column<T>* aggregated_values = new Column<T>(this->name_,this->getType()); 00771 00772 // T val=0; 00773 // for(unsigned int i=0;i<this->size();i++){ 00774 // 00775 // if(i==0){ 00776 // val=(*keys)[i]; 00777 // continue; 00778 // } 00779 00780 // if( (*keys)[i-1]==(*keys)[i] ){ 00781 // val = binary_op(val,(*this)[i]); 00782 // }else{ 00783 // new_keys->insert((*keys)[i]); 00784 // aggregated_values->insert(val); 00785 // val=(*keys)[i]; 00786 // } 00787 00788 // } 00789 00790 // return std::pair<ColumnPtr,ColumnPtr>(ColumnPtr(new_keys), ColumnPtr(aggregated_values)); 00792 //} 00793 00794 00795 template<class Type> 00796 const PositionListPairPtr ColumnBaseTyped<Type>::sort_merge_join(ColumnPtr join_column_){ 00797 00798 if(join_column_->type()!=typeid(Type)){ 00799 std::cout << "Fatal Error!!! Typemismatch for columns " << this->name_ << " and " << join_column_->getName() << std::endl; 00800 std::cout << "File: " << __FILE__ << " Line: " << __LINE__ << std::endl; 00801 exit(-1); 00802 } 00803 00804 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<Type> > join_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<Type> >(join_column_); //static_cast<IntColumnPtr>(column1); 00805 00806 PositionListPairPtr join_tids( new PositionListPair()); 00807 join_tids->first = PositionListPtr( new PositionList() ); 00808 join_tids->second = PositionListPtr( new PositionList() ); 00809 00810 // if(comp_dev==GPU){ 00811 // std::cout << "No GPU Sort Merge Join implemented!" << std::endl; 00812 // exit(-1); 00813 // } 00814 00815 PositionListPtr tids_sorted_join_column1 = this->sort(ASCENDING); //doesnt change column itself, only creates lookup table 00816 PositionListPtr tids_sorted_join_column2 = join_column->sort(ASCENDING); 00817 00818 assert(tids_sorted_join_column1!=NULL); 00819 // assert(tids_sorted_join_column1->first!=NULL); 00820 // assert(tids_sorted_join_column1->second!=NULL); 00821 00822 assert(tids_sorted_join_column2!=NULL); 00823 00824 assert(tids_sorted_join_column1->size()==this->size()); 00825 assert(tids_sorted_join_column2->size()==join_column->size()); 00826 00827 TID i=0; 00828 TID j=0; 00829 bool end=false; 00830 while(!end){ 00831 if(!quiet && verbose && debug){ 00832 std::cout << "index i: " << i << "(max: " << tids_sorted_join_column1->size() 00833 << ") index j:" << j << "(max: " << tids_sorted_join_column2->size() << ")" << std::endl; 00834 } 00835 Type val = (*this)[(*tids_sorted_join_column1)[i]]; 00836 Type val2 = (*(join_column))[(*tids_sorted_join_column2)[j]]; //j); 00837 if(val>val2){ 00838 j++; 00839 if(j>=join_column->size()-1){ 00840 end=true; 00841 00842 } 00843 }else if (val<val2){ 00844 i++; 00845 if(i>=this->size()-1){ 00846 end=true; 00847 } 00848 }else if(val==val2){ 00849 Type tmp_val=val; 00850 unsigned int index=0; 00851 while(tmp_val==val){ 00852 if(i+index==this->size()-1){ index++; break;} 00853 index++; 00854 tmp_val = (*this)[(*tids_sorted_join_column1)[i+index]]; 00855 //tmp_val = any_cast<T>(tmp_value); 00856 } 00857 tmp_val=val2; 00858 unsigned int index2=0; 00859 while(tmp_val==val2){ 00860 00861 if(j+index2==join_column->size()-1){ index2++; break;} 00862 index2++; 00863 tmp_val = (*join_column)[(*tids_sorted_join_column2)[j+index2]]; 00864 //tmp_val = any_cast<T>(tmp_value); 00865 } 00866 for(unsigned int k=i;k<i+index;k++){ 00867 for(unsigned int l=j;l<j+index2;l++){ 00868 // join_tids.push_back(TID_Pair(tids_sorted_join_column1[k],tids_sorted_join_column2[l])); 00869 join_tids->first->push_back((*tids_sorted_join_column1)[k]); 00870 join_tids->second->push_back((*tids_sorted_join_column2)[l]); 00871 } 00872 } 00873 00874 i=i+index; 00875 j=j+index2; 00876 00877 if(i>=this->size()-1 || j>=join_column->size()-1){ 00878 end=true; 00879 } 00880 00881 } 00882 } 00883 return join_tids; 00884 } 00885 00886 00887 template<class Type> 00888 const PositionListPairPtr ColumnBaseTyped<Type>::nested_loop_join(ColumnPtr join_column_){ 00889 assert(join_column_!=NULL); 00890 if(join_column_->type()!=typeid(Type)){ 00891 std::cout << "Fatal Error!!! Typemismatch for columns " << this->name_ << " and " << join_column_->getName() << std::endl; 00892 std::cout << "File: " << __FILE__ << " Line: " << __LINE__ << std::endl; 00893 exit(-1); 00894 } 00895 00896 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<Type> > join_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<Type> >(join_column_); //static_cast<IntColumnPtr>(column1); 00897 00898 PositionListPairPtr join_tids( new PositionListPair()); 00899 join_tids->first = PositionListPtr( new PositionList() ); 00900 join_tids->second = PositionListPtr( new PositionList() ); 00901 00902 00903 unsigned int join_column1_size=this->size(); 00904 unsigned int join_column2_size=join_column->size(); 00905 00906 for(unsigned int i=0;i<join_column1_size;i++){ 00907 for(unsigned int j=0;j<join_column2_size;j++){ 00908 if((*this)[i]==(*join_column)[j]){ 00909 if(debug) std::cout << "MATCH: (" << i << "," << j << ")" << std::endl; 00910 join_tids->first->push_back(i); 00911 join_tids->second->push_back(j); 00912 } 00913 } 00914 } 00915 00916 00917 00918 return join_tids; 00919 } 00920 00921 template<class T> 00922 bool ColumnBaseTyped<T>::operator==(ColumnBaseTyped<T>& column){ 00923 if(this->size()!=column.size()) return false; 00924 for(unsigned int i=0;i<this->size();i++){ 00925 if((*this)[i]!=column[i]){ 00926 return false; 00927 } 00928 } 00929 return true; 00930 } 00931 00932 template<class Type> 00933 bool ColumnBaseTyped<Type>::add(const boost::any& new_value){ 00934 if(new_value.empty()) return false; 00935 if(typeid(Type)==new_value.type()){ 00936 Type value = boost::any_cast<Type>(new_value); 00937 //std::transform(myvec.begin(), myvec.end(), myvec.begin(), 00938 //bind2nd(std::plus<double>(), 1.0)); 00939 for(unsigned int i=0;i<this->size();i++){ 00940 this->operator[](i)+=value; 00941 } 00942 return true; 00943 } 00944 return false; 00945 } 00946 00947 00948 00949 template<class Type> 00950 bool ColumnBaseTyped<Type>::add(ColumnPtr column){ 00951 //std::transform ( first, first+5, second, results, std::plus<int>() ); 00952 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<Type> > typed_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<Type> >(column); 00953 if(!column) return false; 00954 for(unsigned int i=0;i<this->size();i++){ 00955 this->operator[](i)+=typed_column->operator[](i); 00956 } 00957 return true; 00958 } 00959 /* 00960 template<class Type> 00961 bool ColumnBaseTyped<Type>::add(ColumnPtr column){ 00962 //std::transform ( first, first+5, second, results, std::plus<int>() ); 00963 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<Type> > typed_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<Type> >(column); 00964 if(!column) return false; 00965 for(unsigned int i=0;i<this->size();i++){ 00966 this->operator[](i)+=typed_column->operator[](i); 00967 } 00968 return true; 00969 }*/ 00970 00971 template<class Type> 00972 bool ColumnBaseTyped<Type>::minus(const boost::any& new_value){ 00973 //shared_pointer_namespace::shared_ptr<ColumnBaseTyped<Type> > typed_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<Type> >(column); 00974 if(new_value.empty()) return false; 00975 if(typeid(Type)==new_value.type()){ 00976 Type value = boost::any_cast<Type>(new_value); 00977 for(unsigned int i=0;i<this->size();i++){ 00978 this->operator[](i)-=value; 00979 } 00980 return true; 00981 } 00982 return false; 00983 } 00984 00985 template<class Type> 00986 bool ColumnBaseTyped<Type>::minus(ColumnPtr column){ 00987 //std::transform ( first, first+5, second, results, std::plus<int>() ); 00988 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<Type> > typed_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<Type> >(column); 00989 if(!column) return false; 00990 for(unsigned int i=0;i<this->size();i++){ 00991 this->operator[](i)-=typed_column->operator[](i); 00992 } 00993 return true; 00994 } 00995 00996 00997 template<class Type> 00998 bool ColumnBaseTyped<Type>::multiply(const boost::any& new_value){ 00999 if(new_value.empty()) return false; 01000 if(typeid(Type)==new_value.type()){ 01001 Type value = boost::any_cast<Type>(new_value); 01002 for(unsigned int i=0;i<this->size();i++){ 01003 this->operator[](i)*=value; 01004 } 01005 return true; 01006 } 01007 return false; 01008 } 01009 01010 template<class Type> 01011 bool ColumnBaseTyped<Type>::multiply(ColumnPtr column){ 01012 //std::transform ( first, first+5, second, results, std::plus<int>() ); 01013 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<Type> > typed_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<Type> >(column); 01014 if(!column) return false; 01015 for(unsigned int i=0;i<this->size();i++){ 01016 this->operator[](i)*=typed_column->operator[](i); 01017 } 01018 return true; 01019 } 01020 01021 01022 01023 template<class Type> 01024 bool ColumnBaseTyped<Type>::division(const boost::any& new_value){ 01025 if(new_value.empty()) return false; 01026 if(typeid(Type)==new_value.type()){ 01027 Type value = boost::any_cast<Type>(new_value); 01028 //check that we do not devide by zero 01029 if(value==0) return false; 01030 for(unsigned int i=0;i<this->size();i++){ 01031 this->operator[](i)/=value; 01032 } 01033 return true; 01034 } 01035 return false; 01036 } 01037 01038 template<class Type> 01039 bool ColumnBaseTyped<Type>::division(ColumnPtr column){ 01040 //std::transform ( first, first+5, second, results, std::plus<int>() ); 01041 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<Type> > typed_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<Type> >(column); 01042 if(!column) return false; 01043 for(unsigned int i=0;i<this->size();i++){ 01044 //if(typed_column->operator[](i)==0) return false; 01045 if(typed_column->operator[](i)==0){ 01046 std::cerr << "Fatal Error! In ColumnBaseTyped<Type>::division: Division by Zero!" << std::endl; 01047 } 01048 assert(typed_column->operator[](i)!=0); 01049 this->operator[](i)/=typed_column->operator[](i); 01050 } 01051 return true; 01052 } 01053 01054 //total tempalte specializations, because numeric computations are undefined on strings 01055 template<> 01056 inline bool ColumnBaseTyped<std::string>::add(const boost::any&){ return false; } 01057 template<> 01058 inline bool ColumnBaseTyped<std::string>::add(ColumnPtr){ return false; } 01059 01060 template<> 01061 inline bool ColumnBaseTyped<std::string>::minus(const boost::any&){ return false; } 01062 template<> 01063 inline bool ColumnBaseTyped<std::string>::minus(ColumnPtr){ return false; } 01064 01065 01066 template<> 01067 inline bool ColumnBaseTyped<std::string>::multiply(const boost::any&){ return false; } 01068 template<> 01069 inline bool ColumnBaseTyped<std::string>::multiply(ColumnPtr){ return false; } 01070 01071 template<> 01072 inline bool ColumnBaseTyped<std::string>::division(const boost::any&){ return false; } 01073 template<> 01074 inline bool ColumnBaseTyped<std::string>::division(ColumnPtr){ return false; } 01075 01076 }; //end namespace CogaDB 01077