Column-oriented GPU-accelerated Database Management System
CoGaDB
/home/sebastian/gpudbms/trunk/cogadb/include/core/column_base_typed.hpp
Go to the documentation of this file.
00001 
00002 #pragma once
00003 
00004 #include <core/base_column.hpp>
00005 #include <iostream>
00006 
00007 #include <utility>
00008 #include <functional>
00009 #include <algorithm>
00010 //#include <vector>
00011 
00012 //#include <unordered_map>
00013 #include <boost/unordered_map.hpp>
00014 #include <boost/any.hpp>
00015 #include <boost/thread.hpp>
00016 #include <boost/bind.hpp>
00017 //TBB includes
00018 #include <tbb/parallel_sort.h>
00019 #include "tbb/parallel_scan.h"
00020 #include <tbb/task_scheduler_init.h>
00021 
00022 #include <gpu/gpu_algorithms.hpp>
00023 
00024 #include <util/time_measurement.hpp>
00025 #include <util/begin_ptr.hpp>
00026 
00027 //#include <core/column.hpp>
00028 
00029 namespace CoGaDB{
00030 
00044 template<class T>
00045 class ColumnBaseTyped : public ColumnBase{
00046         public:
00047         typedef boost::unordered_multimap<T,TID,boost::hash<T>, std::equal_to<T> > HashTable;
00048         //typedef boost::shared_ptr<ColumnBaseTyped> ColumnPtr;
00049         /***************** constructors and destructor *****************/
00050         ColumnBaseTyped(const std::string& name, AttributeType db_type);
00051         virtual ~ColumnBaseTyped();
00052 
00053         virtual bool insert(const boost::any& new_Value)=0;
00054         virtual bool insert(const T& new_Value) = 0;
00055         virtual bool update(TID tid, const boost::any& new_value) = 0;
00056         virtual bool update(PositionListPtr tid, const boost::any& new_value) = 0;      
00057         
00058         virtual bool remove(TID tid)=0;
00059         //assumes tid list is sorted ascending
00060         virtual bool remove(PositionListPtr tid)=0;
00061         virtual bool clearContent()=0;
00062 
00063         virtual const boost::any get(TID tid)=0;
00064         //virtual const boost::any* const getRawData()=0;
00065         virtual void print() const throw()=0;
00066         virtual size_t size() const throw()=0;
00067         virtual unsigned int getSizeinBytes() const throw()=0;
00068 
00069         virtual const ColumnPtr copy() const=0;
00070         virtual const ColumnPtr gather(PositionListPtr tid_list) = 0; 
00071         /***************** relational operations on Columns which return lookup tables *****************/
00072         virtual const PositionListPtr sort(SortOrder order); 
00073         virtual const PositionListPtr selection(const boost::any& value_for_comparison, const ValueComparator comp);
00074         virtual const PositionListPtr selection(ColumnPtr, const ValueComparator comp);   
00075         virtual const PositionListPtr parallel_selection(const boost::any& value_for_comparison, const ValueComparator comp, unsigned int number_of_threads);
00076         virtual const PositionListPtr lock_free_parallel_selection(const boost::any& value_for_comparison, const ValueComparator comp, unsigned int number_of_threads);
00077         //join algorithms
00078         virtual const PositionListPairPtr hash_join(ColumnPtr join_column);
00079         virtual const PositionListPairPtr parallel_hash_join(ColumnPtr join_column,unsigned int number_of_threads);
00080         static void hash_join_pruning_thread(ColumnBaseTyped<T>* join_column, HashTable* hashtable, unsigned int* join_tids_table1, unsigned int* join_tids_table2, unsigned int thread_id, unsigned int number_of_threads, unsigned int* result_size);
00081         virtual const PositionListPairPtr sort_merge_join(ColumnPtr join_column);
00082         virtual const PositionListPairPtr nested_loop_join(ColumnPtr join_column);
00083 
00084         virtual bool add(const boost::any& new_Value);
00085         //vector addition between columns                       
00086         virtual bool add(ColumnPtr join_column);
00087 
00088         virtual bool minus(const boost::any& new_Value);
00089         virtual bool minus(ColumnPtr join_column);      
00090 
00091         virtual bool multiply(const boost::any& new_Value);
00092         virtual bool multiply(ColumnPtr join_column);
00093 
00094         virtual bool division(const boost::any& new_Value);     
00095         virtual bool division(ColumnPtr join_column);   
00096 
00097         //template <typename U, typename BinaryOperator>
00098         //std::pair<ColumnPtr,ColumnPtr> aggregate_by_keys(ColumnBaseTyped<U>* keys, BinaryOperator binary_op) const;
00099 
00100         virtual bool store(const std::string& path) = 0;
00101         virtual bool load(const std::string& path) = 0;
00102         virtual bool isMaterialized() const  throw() = 0;
00103         virtual bool isCompressed() const  throw() = 0;
00104         virtual const ColumnPtr materialize() throw() =0;       
00105         virtual bool is_equal(ColumnPtr column);        
00106         virtual int compareValuesAtIndexes(TID id1, TID id2);   
00108         virtual const std::type_info& type() const throw();
00113         virtual T& operator[](const int index) = 0;
00114         inline bool operator==(ColumnBaseTyped<T>& column);
00115        
00116 };
00117 
00118 
00119         template<class T>
00120         ColumnBaseTyped<T>::ColumnBaseTyped(const std::string& name, AttributeType db_type) : ColumnBase(name,db_type){
00121 
00122         }
00123 
00124         template<class T>
00125         ColumnBaseTyped<T>::~ColumnBaseTyped(){
00126 
00127         }
00128 
00129 template<class T>
00130 const std::type_info& ColumnBaseTyped<T>::type() const throw(){
00131         return typeid(T);
00132 }
00133 
00134         template<class T>
00135         bool ColumnBaseTyped<T>::is_equal(ColumnPtr column)
00136         {
00137                 if(column->type()!=typeid(T)) {
00138                         return false;
00139                         //std::cout << "Fatal Error!!! Typemismatch for columns " << this->name_  << " and " << column->getName() << std::endl;
00140                         //std::cout << "File: " << __FILE__ << " Line: " << __LINE__ << std::endl;
00141                         //exit(-1);
00142                 }
00143                 if(column->size()!=this->size())
00144                         return false;
00145                 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<T> > typed_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<T> >(column); //static_cast<IntColumnPtr>(column1);
00146                 for(unsigned int i=0; i<this->size(); ++i) {
00147                         if((*this)[i]!=(*typed_column)[i]) {
00148                                 return false;
00149                         }
00150                 }
00151                 return true;
00152         }
00153         
00154         template<class T>
00155         int ColumnBaseTyped<T>::compareValuesAtIndexes(TID id1, TID id2){
00156             if ((*this)[id1] == (*this)[id2]) return 0;
00157             else if ((*this)[id1] < (*this)[id2]) return 1;
00158             else return -1;
00159             /*
00160                 if(column->type()!=typeid(T)) {
00161                         return false;
00162                 }
00163                 assert(id_column<column->size() && id_this_col < this->size());
00164 //                      return false;
00165                 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<T> > typed_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<T> >(column); //static_cast<IntColumnPtr>(column1);
00166                 return (*this)[id_this_col]<typed_column[id_column]); */    
00167         }
00168 
00169 
00170         
00171 template<class T>
00172 const PositionListPtr ColumnBaseTyped<T>::sort(SortOrder order){
00173 
00174         PositionListPtr ids = PositionListPtr( new PositionList());
00175                 std::vector<std::pair<T,TID> > v;
00176 
00177                 for(unsigned int i=0;i<this->size();i++){
00178                         v.push_back (std::pair<T,TID>((*this)[i],i) );
00179                 }
00180 
00181                 //TODO: change implementation, so that no copy operations are required -> use boost zip iterators!
00182 
00183                 if(order==ASCENDING){
00184                         //tbb::parallel_sort(v.begin(),v.end(),std::less_equal<std::pair<T,TID> >());
00185                         std::stable_sort(v.begin(),v.end(),std::less_equal<std::pair<T,TID> >());
00186                 }else if(order==DESCENDING){
00187                         //tbb::parallel_sort(v.begin(),v.end(),std::greater_equal<std::pair<T,TID> >());
00188                         std::stable_sort(v.begin(),v.end(),std::greater_equal<std::pair<T,TID> >()); 
00189                 }else{
00190                         std::cout << "FATAL ERROR: ColumnBaseTyped<T>::sort(): Unknown Sorting Order!" << std::endl;
00191                 }
00192 
00193                 for(unsigned int i=0;i<v.size();i++){
00194                         ids->push_back(v[i].second);
00195                 }
00196 
00197                 return ids;
00198 }
00199 
00200 /*
00201                 template<class T>
00202                 struct Column_Scan_Equal : public unary_function<T,bool> {
00203                         Column_Scan_Equal(std::vector<TID>& local_result_tids, const T& comparison_value) : local_result_tids_(local_result_tids), comparison_value_(comparison_value){}
00204                         bool operator() (T& value) {
00205                                 if(value==comparison_value_){
00206                                         local_result_tids_.push_back();
00207                                         return true;
00208                                 }
00209                                 return false;
00210                         }
00211                         std::vector<TID>& local_result_tids_;
00212                         T comparison_value_;
00213                 };*/
00214 
00215 template<class T>
00216 void selection_thread(unsigned int thread_id, unsigned int number_of_threads, const T& value, const ValueComparator comp, ColumnBaseTyped<T>* col, PositionListPtr result_tids){
00217         //std::cout << "Hi I'm thread" << thread_id << std::endl;
00218         if(!quiet) std::cout << "Using CPU for Selection (parallel mode)..." << std::endl;
00219                 unsigned int array_size=col->size();
00220                 for(TID i=thread_id;i<array_size;i+=number_of_threads){
00221                         
00222                         //boost::any value = column->get(i);
00223                         //val = values_[i];
00224                                 
00225                         if(comp==EQUAL){
00226                             if(value==(*col)[i]){
00227                                     //result_table->insert(this->fetchTuple(i));
00228                                     //std::cout << "MATCH " << thread_id << std::endl;
00229                                     result_tids->push_back(i);
00230                             }
00231                         }else if(comp==LESSER){
00232                             if((*col)[i]<value){
00233                                     //result_table->insert(this->fetchTuple(i));
00234                                     //std::cout << "MATCH " << thread_id << std::endl;
00235                                     result_tids->push_back(i);
00236                             }
00237                         }else if(comp==LESSER_EQUAL){
00238                             if((*col)[i]<=value){
00239                                     //result_table->insert(this->fetchTuple(i));
00240                                     //std::cout << "MATCH " << thread_id << std::endl;
00241                                     result_tids->push_back(i);
00242                             }
00243                         }else if(comp==GREATER){
00244                             if((*col)[i]>value){
00245                                     //std::cout << "MATCH " << thread_id << std::endl;
00246                                     result_tids->push_back(i);
00247                                     //result_table->insert(this->fetchTuple(i));
00248                             }
00249                         }else if(comp==GREATER_EQUAL){
00250                             if((*col)[i]>=value){
00251                                     //std::cout << "MATCH " << thread_id << std::endl;
00252                                     result_tids->push_back(i);
00253                                     //result_table->insert(this->fetchTuple(i));
00254                             }
00255                         }else{
00256 
00257                         }
00258                 }       
00259 }
00260 
00261 template<class T>
00262 const PositionListPtr ColumnBaseTyped<T>::parallel_selection(const boost::any& value_for_comparison, const ValueComparator comp, unsigned int number_of_threads){
00263 
00264                 PositionListPtr result_tids( new PositionList());
00265                 //unsigned int number_of_threads=4;
00266 
00267                 if(value_for_comparison.type()!=typeid(T)){
00268                         std::cout << "Fatal Error!!! Typemismatch for column " << name_ << std::endl;
00269                         std::cout << "File: " << __FILE__ << " Line: " << __LINE__ << std::endl;
00270                         exit(-1);
00271                 }
00272                         
00273                 T value = boost::any_cast<T>(value_for_comparison);
00274 
00275                 boost::thread_group threads;
00276 
00277                 std::vector<PositionListPtr> local_result_arrays;
00278                 for(unsigned int i=0;i<number_of_threads;i++){
00279                         local_result_arrays.push_back(PositionListPtr(new std::vector<TID>()));
00280                         threads.add_thread(new boost::thread(boost::bind(&CoGaDB::selection_thread<T>, i, number_of_threads, value,  comp, this,local_result_arrays[i])));
00281                 }
00282                 threads.join_all();
00283                 
00284                 /*
00285                 unsigned int result_size=0;
00286                 for(unsigned int i=0;i<number_of_threads;i++){
00287                 //      std::cout << "Local result size of thread " << i << ":" << local_result_arrays[i]->size() << std::endl;
00288                         result_size+=local_result_arrays[i]->size();
00289                 }
00290                 result_tids = new PositionList(local_result_arrays[0]->begin(),local_result_arrays[0]->end()); //->resize(result_size);
00291                 for(unsigned int i=1;i<number_of_threads;i++){
00292                         
00293                         std::merge (result_tids.begin(),result_tids.end(),local_result_arrays[i]->begin(),local_result_arrays[i]->end(),result_tids->begin());
00294                 }         */       
00295                 
00296                 
00297                 
00298                 for(unsigned int i=0;i<number_of_threads;i++){
00299                 //      std::cout << "Local result size of thread " << i << ":" << local_result_arrays[i]->size() << std::endl;
00300                         result_tids->insert(result_tids->end(),local_result_arrays[i]->begin(),local_result_arrays[i]->end());
00301                 }
00302                 //std::cout << "number of result tids:" << result_tids->size() << std::endl;
00303                 tbb::parallel_sort(result_tids->begin(),result_tids->end());
00304                 //exit(-1);
00305                 return result_tids;
00306 }
00307 
00308     typedef std::vector<int> Flags;
00309     typedef boost::shared_ptr<Flags> FlagsPtr;
00310 
00311     
00312     class TBB_Body_PrefixSum{
00313     public:
00314         TBB_Body_PrefixSum (std::vector<int>* y_, const std::vector<int>* x_) : sum(0), x(x_), y(y_) {}
00315         int get_sum() const {return sum;}
00316         template<typename Tag>
00317         void operator()( const tbb::blocked_range<int>& r, Tag ) {
00318             int temp = sum;
00319             for( int i=r.begin(); i<r.end(); ++i ) {
00320                 temp = temp + (*x)[i];
00321                 if(Tag::is_final_scan() )
00322                     (*y)[i] = temp;
00323             }
00324             sum = temp;
00325         }
00326         TBB_Body_PrefixSum ( TBB_Body_PrefixSum & b, tbb::split ) : x(b.x), y(b.y), sum(0) {}
00327         void reverse_join( TBB_Body_PrefixSum & a ) { sum = a.sum + sum;}
00328         void assign( TBB_Body_PrefixSum & b ) {sum = b.sum;}
00329     private:
00330         int sum;
00331         std::vector<int>* y;
00332         const std::vector<int>* x;
00333     };
00334     
00335     inline int TBB_Prefix_Sum(std::vector<int>& y, const std::vector<int>& x, unsigned int number_of_threads) {
00336         assert(y.size()==x.size());
00337         unsigned int chunk_size=x.size()/number_of_threads;
00338         TBB_Body_PrefixSum  body(&y,&x);
00339         tbb::parallel_scan( tbb::blocked_range<int>(0,x.size(),chunk_size), body );
00340         return body.get_sum();
00341     }
00342 
00343     
00344     
00345     
00346 template <typename T>
00347 void selection_thread_set_flag_array(ColumnBaseTyped<T>* col, unsigned int thread_id, FlagsPtr flags, unsigned int number_of_threads, const T& value_for_comparison, const ValueComparator comp){
00348         assert(flags->size()==col->size());
00349         if(!quiet) std::cout << "Using CPU for Selection (parallel mode)..." << std::endl;
00350         unsigned int array_size=col->size();
00351         unsigned int chunk_size=col->size()/number_of_threads;
00352         unsigned int start_id=thread_id*chunk_size;
00353         unsigned int end_id=(thread_id*chunk_size)+chunk_size; 
00354         //make sure that the last thread processes the rest of the array
00355         if(thread_id+1==number_of_threads) end_id=array_size;
00356         if(comp==EQUAL){
00357             for(TID i=start_id;i<end_id;i++){
00358                 if(value_for_comparison==(*col)[i]){
00359                     (*flags)[i]=1;
00360                 }
00361             }
00362         }else if(comp==LESSER){
00363             for(TID i=start_id;i<end_id;i++){
00364                 if((*col)[i]<value_for_comparison){
00365                     (*flags)[i]=1;
00366                 }
00367             }
00368         }else if(comp==LESSER_EQUAL){
00369             for(TID i=start_id;i<end_id;i++){
00370                 if((*col)[i]<=value_for_comparison){
00371                     (*flags)[i]=1;
00372                 }
00373             }
00374         }else if(comp==GREATER){
00375             for(TID i=start_id;i<end_id;i++){
00376                 if((*col)[i]>value_for_comparison){
00377                      (*flags)[i]=1;
00378                 }
00379             }
00380         }else if(comp==GREATER_EQUAL){
00381             for(TID i=start_id;i<end_id;i++){
00382                 if((*col)[i]>=value_for_comparison){
00383                      (*flags)[i]=1;
00384                 }
00385             }            
00386         }else{
00387             std::cerr << "FATAL Error! In CoGaDB::selection_thread_set_flag_array(): Invalid Value Comparator! " << comp << std::endl;
00388         }
00389         
00390 //      unsigned int array_size=col->size();   
00391 //        if(comp==EQUAL){
00392 //            for(TID i=thread_id;i<array_size;i+=number_of_threads){
00393 //                if(value_for_comparison==(*col)[i]){
00394 //                    (*flags)[i]=1;
00395 //                }
00396 //            }
00397 //        }else if(comp==LESSER){
00398 //            for(TID i=thread_id;i<array_size;i+=number_of_threads){
00399 //                if((*col)[i]<value_for_comparison){
00400 //                    (*flags)[i]=1;
00401 //                }
00402 //            }
00403 //        }else if(comp==GREATER){
00404 //            for(TID i=thread_id;i<array_size;i+=number_of_threads){
00405 //                if((*col)[i]>value_for_comparison){
00406 //                     (*flags)[i]=1;
00407 //                }
00408 //            }
00409 //        }else{
00410 //            std::cerr << "FATAL Error! In CoGaDB::selection_thread_set_flag_array(): Invalid Value Comparator! " << comp << std::endl;
00411 //        }
00412                 
00413 
00414 }
00415 
00416 template <typename T>
00417 void selection_thread_write_result_to_output_array(ColumnBaseTyped<T>* col, unsigned int thread_id, FlagsPtr flags, std::vector<int>* prefix_sum_array, PositionListPtr result_tids, unsigned int number_of_threads){
00418         assert(flags->size()==col->size());
00419         assert(flags->size()==prefix_sum_array->size());
00420         unsigned int array_size=col->size();
00421         unsigned int chunk_size=col->size()/number_of_threads;
00422         unsigned int start_id=thread_id*chunk_size;
00423         unsigned int end_id=(thread_id*chunk_size)+chunk_size; 
00424         //make sure that the last thread processes the rest of the array
00425         if(thread_id+1==number_of_threads) end_id=array_size;
00426         for(TID i=start_id;i<end_id;i++){
00427             if((*flags)[i]==1){
00428                 unsigned int write_id=(*prefix_sum_array)[i]-1;
00429                 (*result_tids)[write_id]=i; //write matching TID to output buffer
00430             }
00431         }
00432 
00433         //if(!quiet) std::cout << "Using CPU for Selection (parallel mode)..." << std::endl;
00434 //              unsigned int array_size=col->size();
00435 //              for(TID i=thread_id;i<array_size;i+=number_of_threads){
00436 //                    if((*flags)[i]==1){
00437 //                        unsigned int write_id=(*prefix_sum_array)[i]-1;
00438 //                        (*result_tids)[write_id]=i; //write matching TID to output buffer
00439 //                    }
00440 //                }
00441 
00442 }
00443 
00444 
00445 template<class T>
00446 const PositionListPtr ColumnBaseTyped<T>::lock_free_parallel_selection(const boost::any& value_for_comparison, const ValueComparator comp, unsigned int number_of_threads){
00447 
00448                 
00449                 //unsigned int number_of_threads=4;
00450 
00451                 if(value_for_comparison.type()!=typeid(T)){
00452                         std::cout << "Fatal Error!!! Typemismatch for column " << name_ << std::endl;
00453                         std::cout << "File: " << __FILE__ << " Line: " << __LINE__ << std::endl;
00454                         exit(-1);
00455                 }
00456                         
00457                 T value = boost::any_cast<T>(value_for_comparison);
00458 
00459                 boost::thread_group threads;
00460 
00461                 //create flag array of column size and init with zeros 
00462                 FlagsPtr flags(new Flags(this->size(),0)); 
00463                 //std::vector<PositionListPtr> local_result_arrays;
00464                 for(unsigned int i=0;i<number_of_threads;i++){
00465                         threads.add_thread(new boost::thread(boost::bind(&CoGaDB::selection_thread_set_flag_array<T>, this, i, flags, number_of_threads, value,  comp)));
00466                 }
00467                 threads.join_all();
00468                 
00469                 if(!quiet && verbose && debug)
00470                 {
00471                     std::cout << "FLAG Array:" << std::endl;
00472                     for(unsigned int i=0;i<flags->size();++i){
00473                         std::cout << (*flags)[i] << std::endl;
00474                     }
00475                 }
00476 
00477                 std::vector<int> prefix_sum(this->size(),0);
00478                 
00479                 //do prefix sum on threads
00480                 TBB_Prefix_Sum(prefix_sum,*flags,number_of_threads);
00481                 
00482                 if(!quiet && verbose && debug)
00483                 {
00484                     std::cout << "Prefix Sum:" << std::endl;
00485                     for(unsigned int i=0;i<prefix_sum.size();++i){
00486                         std::cout << prefix_sum[i] << std::endl;
00487                     }
00488                 }
00489                 unsigned int resul_size=prefix_sum.back();
00490                 PositionListPtr result_tids( new PositionList(resul_size));
00491                 
00492                 for(unsigned int i=0;i<number_of_threads;i++){
00493                         threads.add_thread(new boost::thread(boost::bind(&CoGaDB::selection_thread_write_result_to_output_array<T>, this, i, flags, &prefix_sum, result_tids, number_of_threads)));
00494                 }
00495                 threads.join_all();
00496                 
00497                 if(!quiet && verbose && debug)
00498                 {
00499                     std::cout << "TIDS:" << std::endl;
00500                     for(unsigned int i=0;i<result_tids->size();++i){
00501                         std::cout << (*result_tids)[i] << std::endl;
00502                     }
00503                 }
00504                 
00505                 return result_tids;
00506 }
00507 
00508 
00509 
00510 template<class T>
00511 const PositionListPtr ColumnBaseTyped<T>::selection(const boost::any& value_for_comparison, const ValueComparator comp){
00512     if(value_for_comparison.type()!=typeid(T)){
00513             std::cout << "Fatal Error!!! Typemismatch for column " << name_ << std::endl;
00514             std::cout << "File: " << __FILE__ << " Line: " << __LINE__ << std::endl;
00515             exit(-1);
00516     }
00517 
00518     T value = boost::any_cast<T>(value_for_comparison);
00519     PositionListPtr result_tids;
00520 
00521     result_tids = PositionListPtr(new PositionList());
00522     //one third rule for selections: assume a selectivity of 0.3, meaning we need roughly 0.3 times of the input to store the result 
00523     //this optimizatio nshould minimize the number of reallocations during the insertion process
00524     //result_tids->reserve(0.3*this->size()); 
00525     
00526     //std::cout << "Scan column of size " << this->size() << std::endl; 
00527     unsigned int array_size=this->size();
00528 //    //calls new internally   
00529 //     result_tids->resize(array_size);
00530 //     //tids.reserve(array_size);
00531 //     //get pointer
00532 //     unsigned int* array_tids=hype::util::begin_ptr(*result_tids);
00533 //     assert(array_tids!=NULL);
00534 //     unsigned int pos=0;
00535     if(!quiet) std::cout << "Using CPU for Selection..." << std::endl;
00536     //unsigned int array_size = this->size();
00537     //for(TID i=0;i<array_size;i++){
00538 
00539             //boost::any value = column->get(i);
00540             //val = values_[i];
00541 
00542 //        unsigned int pos =0;
00543 //        for(TID i=0;i<array_size;i++){
00544 //                    result_tids[pos]=i;
00545 //                    pos+=(value==(*this)[i]);
00546 //        } 
00547     
00548     if(comp==EQUAL){
00549         for(TID i=0;i<array_size;i++){
00550             if(value==(*this)[i]){
00551                     //array_tids[pos++]=i;
00552                     result_tids->push_back(i);
00553             }
00554         }
00555     }else if(comp==LESSER){
00556         for(TID i=0;i<array_size;i++){
00557             if((*this)[i]<value){
00558                     //result_table->insert(this->fetchTuple(i));
00559                     result_tids->push_back(i);
00560                     //array_tids[pos++]=i;
00561             }
00562         }
00563     }else if(comp==LESSER_EQUAL){
00564         for(TID i=0;i<array_size;i++){
00565             if((*this)[i]<=value){
00566                     result_tids->push_back(i);
00567                     //result_table->insert(this->fetchTuple(i));
00568                     //array_tids[pos++]=i;
00569             }
00570         } 
00571     }else if(comp==GREATER){
00572         for(TID i=0;i<array_size;i++){
00573             if((*this)[i]>value){
00574                     result_tids->push_back(i);
00575                     //result_table->insert(this->fetchTuple(i));
00576                     //array_tids[pos++]=i;
00577             }
00578         }
00579     }else if(comp==GREATER_EQUAL){
00580         for(TID i=0;i<array_size;i++){
00581             if((*this)[i]>=value){
00582                     result_tids->push_back(i);
00583                     //result_table->insert(this->fetchTuple(i));
00584                     //array_tids[pos++]=i;
00585             }
00586         }          
00587     }else{
00588 
00589     }
00590     //}
00591     //shrink to actual result size
00592     //result_tids->resize(pos);
00593     return result_tids;
00594 }
00595 
00596 template<class T>
00597 const PositionListPtr ColumnBaseTyped<T>::selection(ColumnPtr comparison_column, const ValueComparator comp){
00598     assert(comparison_column!=NULL);
00599     if(comparison_column->type()!=typeid(T)){
00600             std::cout << "Fatal Error!!! Typemismatch for columns " << this->name_  << " and " << comparison_column->getName() << std::endl;
00601             std::cout << "File: " << __FILE__ << " Line: " << __LINE__ << std::endl;
00602             exit(-1);
00603     }
00604 
00605     shared_pointer_namespace::shared_ptr<ColumnBaseTyped<T> > column = shared_pointer_namespace::dynamic_pointer_cast<ColumnBaseTyped<T> >(comparison_column); //static_cast<IntColumnPtr>(column1);
00606     assert(column!=NULL);
00607     
00608     PositionListPtr result_tids;
00609 
00610     result_tids = PositionListPtr(new PositionList());
00611     //one third rule for selections: assume a selectivity of 0.3, meaning we need roughly 0.3 times of the input to store the result 
00612     //this optimizatio nshould minimize the number of reallocations during the insertion process
00613     result_tids->reserve(this->size()); 
00614     if(!quiet) std::cout << "Using CPU for Selection..." << std::endl;
00615     unsigned int array_size = this->size();
00616     //for(TID i=0;i<array_size;i++){
00617 
00618             //boost::any value = column->get(i);
00619             //val = values_[i];
00620 
00621     if(comp==EQUAL){
00622         for(TID i=0;i<array_size;i++){
00623             if((*column)[i]==(*this)[i]){
00624                     result_tids->push_back(i);
00625             }
00626         }
00627     }else if(comp==LESSER){
00628         for(TID i=0;i<array_size;i++){
00629             if((*this)[i]<(*column)[i]){
00630                     //result_table->insert(this->fetchTuple(i));
00631                     result_tids->push_back(i);
00632             }
00633         }
00634     }else if(comp==LESSER_EQUAL){
00635         for(TID i=0;i<array_size;i++){
00636             if((*this)[i]<=(*column)[i]){
00637                     //result_table->insert(this->fetchTuple(i));
00638                     result_tids->push_back(i);
00639             }
00640         }
00641     }else if(comp==GREATER){
00642         for(TID i=0;i<array_size;i++){
00643             if((*this)[i]>(*column)[i]){
00644                     result_tids->push_back(i);
00645                     //result_table->insert(this->fetchTuple(i));
00646             }
00647         }
00648     }else if(comp==GREATER_EQUAL){
00649         for(TID i=0;i<array_size;i++){
00650             if((*this)[i]>=(*column)[i]){
00651                     result_tids->push_back(i);
00652                     //result_table->insert(this->fetchTuple(i));
00653             }
00654         }
00655     }else{
00656 
00657     }
00658     //} 
00659 
00660     return result_tids;
00661 }
00662 
00663         
00664         template<class T>
00665         const PositionListPairPtr ColumnBaseTyped<T>::hash_join(ColumnPtr join_column_){
00666 
00667                 typedef boost::unordered_multimap<T,TID,boost::hash<T>, std::equal_to<T> > HashTable;
00668 
00669                                 if(join_column_->type()!=typeid(T)){
00670                                         std::cout << "Fatal Error!!! Typemismatch for columns " << this->name_  << " and " << join_column_->getName() << std::endl;
00671                                         std::cout << "File: " << __FILE__ << " Line: " << __LINE__ << std::endl;
00672                                         exit(-1);
00673                                 }
00674                                 
00675                                 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<T> > join_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<T> >(join_column_); //static_cast<IntColumnPtr>(column1);
00676 
00677                                 PositionListPairPtr join_tids( new PositionListPair());
00678                                 join_tids->first = PositionListPtr( new PositionList() );
00679                                 join_tids->second = PositionListPtr( new PositionList() );
00680 
00681                                 
00682         Timestamp build_hashtable_begin = getTimestamp();
00683         //create hash table
00684         HashTable hashtable;
00685         unsigned int hash_table_size=this->size();
00686         unsigned int join_column_size=join_column->size();
00687 
00688         assert(join_column_->size()>=this->size());                        
00689         unsigned int* join_tids_table1 =  new unsigned int[join_column_size];   
00690         unsigned int* join_tids_table2 =  new unsigned int[join_column_size]; 
00691         unsigned int pos1 = 0;
00692         unsigned int pos2 = 0;        
00693         ColumnBaseTyped<T>& join_column_ref = dynamic_cast< ColumnBaseTyped<T>& >(*join_column);
00694         
00695         for(unsigned int i=0;i<hash_table_size;i++)     
00696                 hashtable.insert(std::pair<T,TID> ((*this)[i],i));
00697         Timestamp build_hashtable_end = getTimestamp();
00698 //        std::cout << "Number of Buckets: " << hashtable.bucket_count() << std::endl;
00699 //        for(unsigned int i=0;i< hashtable.bucket_count();i++){
00700 //            std::cout << "Size of Bucket '" << i << "': " << hashtable.bucket_size(i) << std::endl;
00701 //        }
00702         
00703         //probe larger relation
00704         Timestamp prune_hashtable_begin = getTimestamp();
00705 
00706         std::pair<typename HashTable::iterator, typename HashTable::iterator> range;
00707         typename HashTable::iterator it;
00708         for(unsigned int i=0;i<join_column_size;i++){
00709                 range =  hashtable.equal_range(join_column_ref[i]);
00710                 for(it=range.first ; it!=range.second;++it){
00711                         if(it->first==join_column_ref[i]){ //(*join_column)[i]){
00712                                   join_tids_table1[pos1++]=it->second;
00713                                   join_tids_table2[pos2++]=i;
00714                                   
00715                                   //pos2=++pos1;
00716 //                                join_tids_table1[pos1++]=it->second;
00717 //                                join_tids_table2[pos2++]=i;
00718 //                              join_tids->first->push_back(it->second);
00719 //                              join_tids->second->push_back(i);
00720                                 
00721                                 //cout << "match! " << it->second << ", " << i << "     "  << it->first << endl;
00722                         }
00723                 }
00724         }
00725         
00726 //        for (unsigned int i = 0; i < join_column_size; i++) {
00727 //            std::pair<typename HashTable::iterator, typename HashTable::iterator> range = hashtable.equal_range((*join_column)[i]);
00728 //            for (typename HashTable::iterator it = range.first; it != range.second; ++it) {
00729 //                if (it->first == join_column_ref[i]) { //(*join_column)[i]){
00730 //                    join_tids_table1[pos1++] = it->second;
00731 //                    join_tids_table2[pos2++] = i;
00732 //                    //                                join_tids->first->push_back(it->second);
00733 //                    //                                join_tids->second->push_back(i);
00734 //
00735 //                    //cout << "match! " << it->second << ", " << i << "       "  << it->first << endl;
00736 //                }
00737 //            }
00738 //        }
00739         //copy result in PositionList (vector)
00740         join_tids->first->insert(join_tids->first->end(),join_tids_table1,join_tids_table1+pos1);
00741         join_tids->second->insert(join_tids->second->end(),join_tids_table2,join_tids_table2+pos2);
00742         
00743         delete join_tids_table1;
00744         delete join_tids_table2;
00745         Timestamp prune_hashtable_end = getTimestamp();
00746 
00747         if(!quiet && verbose)
00748                 std::cout << "Hash Join: Build Phase: " << double(build_hashtable_end-build_hashtable_begin)/(1000*1000) << "ms"
00749                           << "Pruning Phase: "  << double(prune_hashtable_end-prune_hashtable_begin)/(1000*1000) << "ms" << std::endl; 
00750         
00751                 return join_tids;
00752         }
00753 
00754 
00755         
00756         
00757 //template <typename T> template< typename U, typename BinaryOperator>
00758 //std::pair<ColumnPtr,ColumnPtr> ColumnBaseTyped<T>::aggregate_by_keys(ColumnBaseTyped<U>* keys, BinaryOperator binary_op) const{
00759 
00760 //                              /*
00761 //                              if(keys->type()!=typeid(T)){
00762 //                                      std::cout << "Fatal Error!!! Typemismatch for columns " << this->name_  << " and " << keys->getName() << std::endl;
00763 //                                      std::cout << "File: " << __FILE__ << " Line: " << __LINE__ << std::endl;
00764 //                                      exit(-1);
00765 //                              }*/
00766 // 
00767 //                              assert(keys->size()==this->size());
00768 
00769 // Column<T>* new_keys = new Column<T>(this->name_,this->getType());
00770 // Column<T>* aggregated_values = new Column<T>(this->name_,this->getType()); 
00771 
00772 // T val=0;
00773 // for(unsigned int i=0;i<this->size();i++){
00774 //   
00775 //  if(i==0){
00776 //   val=(*keys)[i];
00777 //   continue;
00778 //  }
00779 
00780 // if(  (*keys)[i-1]==(*keys)[i] ){
00781 //   val = binary_op(val,(*this)[i]);  
00782 // }else{
00783 //  new_keys->insert((*keys)[i]);
00784 //  aggregated_values->insert(val);
00785 //  val=(*keys)[i];
00786 // }
00787 
00788 // }
00789 
00790 // return std::pair<ColumnPtr,ColumnPtr>(ColumnPtr(new_keys), ColumnPtr(aggregated_values));
00792 //}
00793 
00794 
00795         template<class Type>
00796         const PositionListPairPtr ColumnBaseTyped<Type>::sort_merge_join(ColumnPtr join_column_){
00797 
00798                                 if(join_column_->type()!=typeid(Type)){
00799                                         std::cout << "Fatal Error!!! Typemismatch for columns " << this->name_  << " and " << join_column_->getName() << std::endl;
00800                                         std::cout << "File: " << __FILE__ << " Line: " << __LINE__ << std::endl;
00801                                         exit(-1);
00802                                 }
00803                                 
00804                                 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<Type> > join_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<Type> >(join_column_); //static_cast<IntColumnPtr>(column1);
00805 
00806                                 PositionListPairPtr join_tids( new PositionListPair());
00807                                 join_tids->first = PositionListPtr( new PositionList() );
00808                                 join_tids->second = PositionListPtr( new PositionList() );
00809 
00810 //                              if(comp_dev==GPU){
00811 //                                      std::cout << "No GPU Sort Merge Join implemented!" << std::endl;
00812 //                                      exit(-1);
00813 //                              }
00814 
00815                                 PositionListPtr tids_sorted_join_column1 = this->sort(ASCENDING); //doesnt change column itself, only creates lookup table
00816                                 PositionListPtr tids_sorted_join_column2 = join_column->sort(ASCENDING);
00817 
00818                                 assert(tids_sorted_join_column1!=NULL);
00819 //                              assert(tids_sorted_join_column1->first!=NULL);
00820 //                              assert(tids_sorted_join_column1->second!=NULL);
00821 
00822                                 assert(tids_sorted_join_column2!=NULL);
00823 
00824                                 assert(tids_sorted_join_column1->size()==this->size());
00825                                 assert(tids_sorted_join_column2->size()==join_column->size());
00826 
00827                                 TID i=0;
00828                                 TID j=0;
00829                                 bool end=false;
00830                                 while(!end){
00831                                                 if(!quiet && verbose && debug){
00832                                                         std::cout << "index i: " << i << "(max: " << tids_sorted_join_column1->size() 
00833                                                              << ") index j:" << j << "(max: " << tids_sorted_join_column2->size() << ")" << std::endl;
00834                                                 }
00835                                                 Type val = (*this)[(*tids_sorted_join_column1)[i]];
00836                                                 Type val2 = (*(join_column))[(*tids_sorted_join_column2)[j]]; //j);     
00837                                                 if(val>val2){
00838                                                         j++;
00839                                                         if(j>=join_column->size()-1){
00840                                                           end=true;
00841                                                                 
00842                                                         }
00843                                                 }else if (val<val2){
00844                                                         i++;
00845                                                         if(i>=this->size()-1){
00846                                                            end=true;
00847                                                         }
00848                                                 }else if(val==val2){
00849                                                                 Type tmp_val=val;
00850                                                                 unsigned int index=0;
00851                                                                 while(tmp_val==val){
00852                                                                         if(i+index==this->size()-1){ index++; break;} 
00853                                                                         index++;
00854                                                                         tmp_val = (*this)[(*tids_sorted_join_column1)[i+index]];
00855                                                                         //tmp_val = any_cast<T>(tmp_value);
00856                                                                 }
00857                                                                 tmp_val=val2;
00858                                                                 unsigned int index2=0;
00859                                                                 while(tmp_val==val2){
00860                                 
00861                                                                         if(j+index2==join_column->size()-1){ index2++; break;} 
00862                                                                         index2++;
00863                                                                         tmp_val = (*join_column)[(*tids_sorted_join_column2)[j+index2]];
00864                                                                         //tmp_val = any_cast<T>(tmp_value);
00865                                                                 }                                                               
00866                                                                 for(unsigned int k=i;k<i+index;k++){
00867                                                                         for(unsigned int l=j;l<j+index2;l++){
00868 //                                                                              join_tids.push_back(TID_Pair(tids_sorted_join_column1[k],tids_sorted_join_column2[l]));
00869                                                                                 join_tids->first->push_back((*tids_sorted_join_column1)[k]);
00870                                                                                 join_tids->second->push_back((*tids_sorted_join_column2)[l]);
00871                                                                         }
00872                                                                 }
00873 
00874                                                                 i=i+index;
00875                                                                 j=j+index2;
00876 
00877                                                                 if(i>=this->size()-1 || j>=join_column->size()-1){
00878                                                                         end=true;
00879                                                                 }
00880 
00881                                                 }
00882                                 }
00883                                 return join_tids;
00884         }
00885 
00886 
00887         template<class Type>
00888         const PositionListPairPtr ColumnBaseTyped<Type>::nested_loop_join(ColumnPtr join_column_){
00889                         assert(join_column_!=NULL);
00890                         if(join_column_->type()!=typeid(Type)){
00891                                 std::cout << "Fatal Error!!! Typemismatch for columns " << this->name_  << " and " << join_column_->getName() << std::endl;
00892                                 std::cout << "File: " << __FILE__ << " Line: " << __LINE__ << std::endl;
00893                                 exit(-1);
00894                         }
00895                         
00896                         shared_pointer_namespace::shared_ptr<ColumnBaseTyped<Type> > join_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<Type> >(join_column_); //static_cast<IntColumnPtr>(column1);
00897 
00898                         PositionListPairPtr join_tids( new PositionListPair());
00899                         join_tids->first = PositionListPtr( new PositionList() );
00900                         join_tids->second = PositionListPtr( new PositionList() );
00901 
00902 
00903                         unsigned int join_column1_size=this->size();
00904                         unsigned int join_column2_size=join_column->size();     
00905 
00906                         for(unsigned int i=0;i<join_column1_size;i++){
00907                                 for(unsigned int j=0;j<join_column2_size;j++){
00908                                         if((*this)[i]==(*join_column)[j]){
00909                                                 if(debug) std::cout << "MATCH: (" << i << "," << j << ")" << std::endl;
00910                                                 join_tids->first->push_back(i);
00911                                                 join_tids->second->push_back(j);
00912                                         }
00913                                 }
00914                         }
00915 
00916                 
00917 
00918                 return join_tids;
00919         }
00920 
00921         template<class T>
00922         bool ColumnBaseTyped<T>::operator==(ColumnBaseTyped<T>& column){
00923                 if(this->size()!=column.size()) return false;
00924                 for(unsigned int i=0;i<this->size();i++){
00925                         if((*this)[i]!=column[i]){      
00926                                 return false;
00927                         }
00928                 }
00929                 return true;
00930         }
00931 
00932         template<class Type>
00933         bool ColumnBaseTyped<Type>::add(const boost::any& new_value){
00934                 if(new_value.empty()) return false;
00935                 if(typeid(Type)==new_value.type()){
00936                          Type value = boost::any_cast<Type>(new_value);
00937                          //std::transform(myvec.begin(), myvec.end(), myvec.begin(),
00938           //bind2nd(std::plus<double>(), 1.0));
00939                          for(unsigned int i=0;i<this->size();i++){
00940                                         this->operator[](i)+=value;
00941                          }
00942                          return true;
00943                 }
00944                 return false;
00945         }
00946         
00947 
00948                         
00949         template<class Type>
00950         bool ColumnBaseTyped<Type>::add(ColumnPtr column){
00951                 //std::transform ( first, first+5, second, results, std::plus<int>() );         
00952                 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<Type> > typed_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<Type> >(column);
00953                 if(!column) return false;
00954                 for(unsigned int i=0;i<this->size();i++){
00955                         this->operator[](i)+=typed_column->operator[](i);
00956                 }                       
00957                 return true;
00958         }
00959 /*
00960         template<class Type>
00961         bool ColumnBaseTyped<Type>::add(ColumnPtr column){
00962                 //std::transform ( first, first+5, second, results, std::plus<int>() );         
00963                 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<Type> > typed_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<Type> >(column);
00964                 if(!column) return false;
00965                 for(unsigned int i=0;i<this->size();i++){
00966                         this->operator[](i)+=typed_column->operator[](i);
00967                 }                       
00968                 return true;
00969         }*/
00970 
00971         template<class Type>
00972         bool ColumnBaseTyped<Type>::minus(const boost::any& new_value){
00973                 //shared_pointer_namespace::shared_ptr<ColumnBaseTyped<Type> > typed_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<Type> >(column);    
00974                 if(new_value.empty()) return false;
00975                 if(typeid(Type)==new_value.type()){
00976                          Type value = boost::any_cast<Type>(new_value);
00977                          for(unsigned int i=0;i<this->size();i++){
00978                                         this->operator[](i)-=value;
00979                          }
00980                          return true;
00981                 }
00982                 return false;
00983         }
00984         
00985         template<class Type>
00986         bool ColumnBaseTyped<Type>::minus(ColumnPtr column){
00987                 //std::transform ( first, first+5, second, results, std::plus<int>() );         
00988                 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<Type> > typed_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<Type> >(column);
00989                 if(!column) return false;
00990                 for(unsigned int i=0;i<this->size();i++){
00991                         this->operator[](i)-=typed_column->operator[](i);
00992                 }                       
00993                 return true;
00994         }       
00995 
00996 
00997         template<class Type>
00998         bool ColumnBaseTyped<Type>::multiply(const boost::any& new_value){
00999                 if(new_value.empty()) return false;
01000                 if(typeid(Type)==new_value.type()){
01001                          Type value = boost::any_cast<Type>(new_value);
01002                          for(unsigned int i=0;i<this->size();i++){
01003                                         this->operator[](i)*=value;
01004                          }
01005                          return true;
01006                 }
01007                 return false;
01008         }
01009         
01010         template<class Type>
01011         bool ColumnBaseTyped<Type>::multiply(ColumnPtr column){
01012                 //std::transform ( first, first+5, second, results, std::plus<int>() );         
01013                 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<Type> > typed_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<Type> >(column);
01014                 if(!column) return false;
01015                 for(unsigned int i=0;i<this->size();i++){
01016                         this->operator[](i)*=typed_column->operator[](i);
01017                 }                       
01018                 return true;
01019         }
01020 
01021 
01022 
01023         template<class Type>
01024         bool ColumnBaseTyped<Type>::division(const boost::any& new_value){
01025                 if(new_value.empty()) return false;
01026                 if(typeid(Type)==new_value.type()){
01027                          Type value = boost::any_cast<Type>(new_value);
01028                          //check that we do not devide by zero
01029                          if(value==0) return false;
01030                          for(unsigned int i=0;i<this->size();i++){
01031                                         this->operator[](i)/=value;
01032                          }
01033                          return true;
01034                 }
01035                 return false;
01036         }
01037         
01038         template<class Type>
01039         bool ColumnBaseTyped<Type>::division(ColumnPtr column){
01040                 //std::transform ( first, first+5, second, results, std::plus<int>() );         
01041                 shared_pointer_namespace::shared_ptr<ColumnBaseTyped<Type> > typed_column = shared_pointer_namespace::static_pointer_cast<ColumnBaseTyped<Type> >(column);
01042                 if(!column) return false;
01043                 for(unsigned int i=0;i<this->size();i++){
01044                         //if(typed_column->operator[](i)==0) return false;
01045                         if(typed_column->operator[](i)==0){
01046                             std::cerr << "Fatal Error! In ColumnBaseTyped<Type>::division: Division by Zero!" << std::endl;
01047                         }
01048                         assert(typed_column->operator[](i)!=0);
01049                         this->operator[](i)/=typed_column->operator[](i);
01050                 }                       
01051                 return true;
01052         }
01053 
01054         //total tempalte specializations, because numeric computations are undefined on strings 
01055         template<>
01056         inline bool ColumnBaseTyped<std::string>::add(const boost::any&){ return false; }
01057         template<>
01058         inline bool ColumnBaseTyped<std::string>::add(ColumnPtr){ return false; }
01059 
01060         template<>
01061         inline bool ColumnBaseTyped<std::string>::minus(const boost::any&){ return false;       }
01062         template<>
01063         inline bool ColumnBaseTyped<std::string>::minus(ColumnPtr){ return false;       }
01064 
01065 
01066         template<>
01067         inline bool ColumnBaseTyped<std::string>::multiply(const boost::any&){ return false;    }
01068         template<>
01069         inline bool ColumnBaseTyped<std::string>::multiply(ColumnPtr){ return false;    }
01070         
01071         template<>
01072         inline bool ColumnBaseTyped<std::string>::division(const boost::any&){ return false;    }
01073         template<>
01074         inline bool ColumnBaseTyped<std::string>::division(ColumnPtr){ return false;    }
01075 
01076 }; //end namespace CogaDB
01077 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines