Column-oriented GPU-accelerated Database Management System
CoGaDB
|
00001 #pragma once 00002 00003 #include <query_processing/definitions.hpp> 00004 00005 00006 namespace CoGaDB { 00007 namespace query_processing { 00008 namespace physical_operator { 00009 00010 class CPU_NestedLoopJoin_Operator : public hype::queryprocessing::BinaryOperator<TablePtr, TablePtr, TablePtr> { 00011 public: 00012 typedef hype::queryprocessing::OperatorMapper_Helper_Template<TablePtr>::TypedOperatorPtr TypedOperatorPtr; 00013 00014 CPU_NestedLoopJoin_Operator(const hype::SchedulingDecision& sched_dec, 00015 TypedOperatorPtr left_child, 00016 TypedOperatorPtr right_child, 00017 const std::string& join_column1_name, 00018 const std::string& join_column2_name, 00019 MaterializationStatus mat_stat = MATERIALIZE) : BinaryOperator<TablePtr, TablePtr, TablePtr>(sched_dec, left_child, right_child), 00020 join_column1_name_(join_column1_name), 00021 join_column2_name_(join_column2_name), 00022 mat_stat_(mat_stat) { 00023 } 00024 00025 virtual bool execute() { 00026 if (!quiet && verbose && debug) std::cout << "Execute Join CPU" << std::endl; 00027 //const TablePtr sort(TablePtr table, const std::string& column_name, SortOrder order=ASCENDING, MaterializationStatus mat_stat=MATERIALIZE, ComputeDevice comp_dev=CPU); 00028 //this->result_=BaseTable::sort(this->getInputData(), column_name_,order_, mat_stat_,CPU); 00029 //JoinAlgorithm{SORT_MERGE_JOIN,NESTED_LOOP_JOIN,HASH_JOIN}; 00030 this->result_ = BaseTable::join(this->getInputDataLeftChild(), join_column1_name_, this->getInputDataRightChild(), join_column2_name_, NESTED_LOOP_JOIN, mat_stat_, CPU); 00031 if (this->result_) { 00032 setResultSize(((TablePtr) this->result_)->getNumberofRows()); 00033 return true; 00034 } else 00035 return false; 00036 } 00037 00038 virtual ~CPU_NestedLoopJoin_Operator() { 00039 } 00040 private: 00041 std::string join_column1_name_; 00042 std::string join_column2_name_; 00043 MaterializationStatus mat_stat_; 00044 }; 00045 00046 class CPU_SortMergeJoin_Operator : public hype::queryprocessing::BinaryOperator<TablePtr, TablePtr, TablePtr> { 00047 public: 00048 typedef hype::queryprocessing::OperatorMapper_Helper_Template<TablePtr>::TypedOperatorPtr TypedOperatorPtr; 00049 00050 CPU_SortMergeJoin_Operator(const hype::SchedulingDecision& sched_dec, 00051 TypedOperatorPtr left_child, 00052 TypedOperatorPtr right_child, 00053 const std::string& join_column1_name, 00054 const std::string& join_column2_name, 00055 MaterializationStatus mat_stat = MATERIALIZE) : BinaryOperator<TablePtr, TablePtr, TablePtr>(sched_dec, left_child, right_child), 00056 join_column1_name_(join_column1_name), 00057 join_column2_name_(join_column2_name), 00058 mat_stat_(mat_stat) { 00059 } 00060 00061 virtual bool execute() { 00062 if (!quiet && verbose && debug) std::cout << "Execute Join CPU" << std::endl; 00063 //const TablePtr sort(TablePtr table, const std::string& column_name, SortOrder order=ASCENDING, MaterializationStatus mat_stat=MATERIALIZE, ComputeDevice comp_dev=CPU); 00064 //this->result_=BaseTable::sort(this->getInputData(), column_name_,order_, mat_stat_,CPU); 00065 //JoinAlgorithm{SORT_MERGE_JOIN,NESTED_LOOP_JOIN,HASH_JOIN}; 00066 this->result_ = BaseTable::join(this->getInputDataLeftChild(), join_column1_name_, this->getInputDataRightChild(), join_column2_name_, SORT_MERGE_JOIN, mat_stat_, CPU); 00067 if (this->result_) { 00068 setResultSize(((TablePtr) this->result_)->getNumberofRows()); 00069 return true; 00070 } else 00071 return false; 00072 } 00073 00074 virtual ~CPU_SortMergeJoin_Operator() { 00075 } 00076 private: 00077 std::string join_column1_name_; 00078 std::string join_column2_name_; 00079 MaterializationStatus mat_stat_; 00080 }; 00081 00082 class CPU_HashJoin_Operator : public hype::queryprocessing::BinaryOperator<TablePtr, TablePtr, TablePtr> { 00083 public: 00084 typedef hype::queryprocessing::OperatorMapper_Helper_Template<TablePtr>::TypedOperatorPtr TypedOperatorPtr; 00085 00086 CPU_HashJoin_Operator(const hype::SchedulingDecision& sched_dec, 00087 TypedOperatorPtr left_child, 00088 TypedOperatorPtr right_child, 00089 const std::string& join_column1_name, 00090 const std::string& join_column2_name, 00091 MaterializationStatus mat_stat = MATERIALIZE) : BinaryOperator<TablePtr, TablePtr, TablePtr>(sched_dec, left_child, right_child), 00092 join_column1_name_(join_column1_name), 00093 join_column2_name_(join_column2_name), 00094 mat_stat_(mat_stat) { 00095 } 00096 00097 virtual bool execute() { 00098 if (!quiet && verbose && debug) std::cout << "Execute Join CPU" << std::endl; 00099 //const TablePtr sort(TablePtr table, const std::string& column_name, SortOrder order=ASCENDING, MaterializationStatus mat_stat=MATERIALIZE, ComputeDevice comp_dev=CPU); 00100 //this->result_=BaseTable::sort(this->getInputData(), column_name_,order_, mat_stat_,CPU); 00101 //JoinAlgorithm{SORT_MERGE_JOIN,NESTED_LOOP_JOIN,HASH_JOIN}; 00102 this->result_ = BaseTable::join(this->getInputDataLeftChild(), join_column1_name_, this->getInputDataRightChild(), join_column2_name_, HASH_JOIN, mat_stat_, CPU); 00103 if (this->result_) { 00104 setResultSize(((TablePtr) this->result_)->getNumberofRows()); 00105 return true; 00106 } else 00107 return false; 00108 } 00109 00110 virtual ~CPU_HashJoin_Operator() { 00111 } 00112 private: 00113 std::string join_column1_name_; 00114 std::string join_column2_name_; 00115 MaterializationStatus mat_stat_; 00116 }; 00117 00118 class GPU_Join_Operator : public hype::queryprocessing::BinaryOperator<TablePtr, TablePtr, TablePtr> { 00119 public: 00120 typedef hype::queryprocessing::OperatorMapper_Helper_Template<TablePtr>::TypedOperatorPtr TypedOperatorPtr; 00121 00122 GPU_Join_Operator(const hype::SchedulingDecision& sched_dec, 00123 TypedOperatorPtr left_child, 00124 TypedOperatorPtr right_child, 00125 const std::string& join_column1_name, 00126 const std::string& join_column2_name, 00127 MaterializationStatus mat_stat = MATERIALIZE) : BinaryOperator<TablePtr, TablePtr, TablePtr>(sched_dec, left_child, right_child), 00128 join_column1_name_(join_column1_name), 00129 join_column2_name_(join_column2_name), 00130 mat_stat_(mat_stat) { 00131 } 00132 00133 virtual bool execute() { 00134 if (!quiet && verbose && debug) std::cout << "Execute Sort GPU" << std::endl; 00135 //const TablePtr sort(TablePtr table, const std::string& column_name, SortOrder order=ASCENDING, MaterializationStatus mat_stat=MATERIALIZE, ComputeDevice comp_dev=CPU); 00136 //JoinAlgorithm{SORT_MERGE_JOIN,NESTED_LOOP_JOIN,HASH_JOIN}; 00137 this->result_=BaseTable::join(this->getInputDataLeftChild(), join_column1_name_, this->getInputDataRightChild(), join_column2_name_, SORT_MERGE_JOIN_2, mat_stat_, GPU); 00138 if (this->result_) { 00139 setResultSize(((TablePtr) this->result_)->getNumberofRows()); 00140 return true; 00141 } else 00142 return false; 00143 } 00144 00145 virtual ~GPU_Join_Operator() { 00146 } 00147 private: 00148 std::string join_column1_name_; 00149 std::string join_column2_name_; 00150 MaterializationStatus mat_stat_; 00151 }; 00152 00153 Physical_Operator_Map_Ptr map_init_function_join_operator(); 00154 TypedOperatorPtr create_CPU_NestedLoopJoin_Operator(TypedLogicalNode& logical_node, const hype::SchedulingDecision&, TypedOperatorPtr left_child, TypedOperatorPtr right_child); 00155 TypedOperatorPtr create_CPU_SortMergeJoin_Operator(TypedLogicalNode& logical_node, const hype::SchedulingDecision&, TypedOperatorPtr left_child, TypedOperatorPtr right_child); 00156 TypedOperatorPtr create_CPU_HashJoin_Operator(TypedLogicalNode& logical_node, const hype::SchedulingDecision&, TypedOperatorPtr left_child, TypedOperatorPtr right_child); 00157 TypedOperatorPtr create_GPU_Join_Operator(TypedLogicalNode& logical_node, const hype::SchedulingDecision&, TypedOperatorPtr left_child, TypedOperatorPtr right_child); 00158 00159 }//end namespace physical_operator 00160 00161 //extern Map_Init_Function init_function_Join_operator; 00162 00163 //Map_Init_Function init_function_Join_operator=physical_operator::map_init_function_Join_operator; //boost::bind(); 00164 00165 namespace logical_operator { 00166 00167 class Logical_Join : public hype::queryprocessing::TypedNode_Impl<TablePtr,physical_operator::map_init_function_join_operator> //init_function_Join_operator> 00168 { 00169 public: 00170 00171 Logical_Join(const std::string& join_column1_name, 00172 const std::string& join_column2_name, 00173 MaterializationStatus mat_stat = MATERIALIZE, 00174 hype::DeviceConstraint dev_constr = hype::DeviceConstraint() 00175 ) : TypedNode_Impl<TablePtr, physical_operator::map_init_function_join_operator>(false, dev_constr), 00176 join_column1_name_(join_column1_name), 00177 join_column2_name_(join_column2_name), 00178 mat_stat_(mat_stat) { 00179 } 00180 00181 virtual unsigned int getOutputResultSize() const { 00182 return 10; 00183 } 00184 00185 virtual double getCalculatedSelectivity() const { 00186 return 1; 00187 } 00188 00189 virtual std::string getOperationName() const { 00190 return "JOIN"; 00191 } 00192 std::string toString(bool verbose) const{ 00193 std::string result="JOIN"; 00194 if(verbose){ 00195 result+=" ("; 00196 result+=join_column1_name_; 00197 result+="="; 00198 result+=join_column2_name_; 00199 result+=")"; 00200 } 00201 return result; 00202 00203 } 00204 const std::string& getLeftJoinColumnName() { 00205 return join_column1_name_; 00206 } 00207 00208 const std::string& getRightJoinColumnName() { 00209 return join_column2_name_; 00210 } 00211 00212 const MaterializationStatus& getMaterializationStatus() const { 00213 return mat_stat_; 00214 } 00215 private: 00216 std::string join_column1_name_; 00217 std::string join_column2_name_; 00218 MaterializationStatus mat_stat_; 00219 }; 00220 00221 }//end namespace logical_operator 00222 00223 }//end namespace query_processing 00224 00225 }; //end namespace CogaDB