Column-oriented GPU-accelerated Database Management System
CoGaDB
|
00001 #pragma once 00002 00003 #include <query_processing/definitions.hpp> 00004 #include <util/getname.hpp> 00005 00006 namespace CoGaDB { 00007 namespace query_processing { 00008 namespace physical_operator { 00009 00010 class CPU_Groupby_Operator : public hype::queryprocessing::UnaryOperator<TablePtr, TablePtr> { 00011 public: 00012 typedef hype::queryprocessing::OperatorMapper_Helper_Template<TablePtr>::TypedOperatorPtr TypedOperatorPtr; 00013 00014 CPU_Groupby_Operator(const hype::SchedulingDecision& sched_dec, 00015 TypedOperatorPtr child, 00016 const std::list<std::string>& grouping_columns, 00017 const std::list<ColumnAggregation>& aggregation_functions, 00018 MaterializationStatus mat_stat = MATERIALIZE) : UnaryOperator<TablePtr, TablePtr>(sched_dec, child), 00019 grouping_columns_(grouping_columns), 00020 aggregation_functions_(aggregation_functions), 00021 mat_stat_(mat_stat) { 00022 } 00023 00024 virtual bool execute() { 00025 //std::cout << "Execute Groupby CPU" << std::endl; 00026 //const TablePtr sort(TablePtr table, const std::string& column_name, SortOrder order=ASCENDING, MaterializationStatus mat_stat=MATERIALIZE, ComputeDevice comp_dev=CPU); 00027 //this->result_=BaseTable::sort(this->getInputData(), column_name_,order_, mat_stat_,CPU); 00028 this->result_ = BaseTable::groupby(this->getInputData(), grouping_columns_, aggregation_functions_, CPU); 00029 00030 00031 00032 // std::cout << "<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<" << std::endl; 00033 // std::cout << "INPUT:" << std::endl; 00034 // this->getInputData()->print(); 00035 // std::cout << "<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<" << std::endl; 00036 // std::cout << ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>" << std::endl; 00037 // std::cout << "OUTPUT:" << std::endl; 00038 // this->result_->print(); 00039 // std::cout << ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>" << std::endl; 00040 if (this->result_) { 00041 setResultSize(((TablePtr) this->result_)->getNumberofRows()); 00042 return true; 00043 } else 00044 return false; 00045 } 00046 00047 virtual ~CPU_Groupby_Operator() { 00048 } 00049 private: 00050 std::list<std::string> grouping_columns_; 00051 std::list<ColumnAggregation> aggregation_functions_; 00052 MaterializationStatus mat_stat_; 00053 }; 00054 00055 class GPU_Groupby_Operator : public hype::queryprocessing::UnaryOperator<TablePtr, TablePtr> { 00056 public: 00057 typedef hype::queryprocessing::OperatorMapper_Helper_Template<TablePtr>::TypedOperatorPtr TypedOperatorPtr; 00058 00059 GPU_Groupby_Operator(const hype::SchedulingDecision& sched_dec, 00060 TypedOperatorPtr child, 00061 const std::list<std::string>& grouping_columns, 00062 const std::list<ColumnAggregation>& aggregation_functions, 00063 MaterializationStatus mat_stat = MATERIALIZE) : UnaryOperator<TablePtr, TablePtr>(sched_dec, child), 00064 grouping_columns_(grouping_columns), 00065 aggregation_functions_(aggregation_functions), 00066 mat_stat_(mat_stat) { 00067 } 00068 00069 virtual bool execute() { 00070 //std::cout << "Execute Groupby GPU" << std::endl; 00071 //const TablePtr sort(TablePtr table, const std::string& column_name, SortOrder order=ASCENDING, MaterializationStatus mat_stat=MATERIALIZE, ComputeDevice comp_dev=CPU); 00072 this->result_ = BaseTable::groupby(this->getInputData(), grouping_columns_, aggregation_functions_, GPU); 00073 00074 // std::cout << "<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<" << std::endl; 00075 // std::cout << "INPUT:" << std::endl; 00076 // this->getInputData()->print(); 00077 // std::cout << "<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<" << std::endl; 00078 // std::cout << ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>" << std::endl; 00079 // std::cout << "OUTPUT:" << std::endl; 00080 // this->result_->print(); 00081 // std::cout << ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>" << std::endl; 00082 if (this->result_) { 00083 setResultSize(((TablePtr) this->result_)->getNumberofRows()); 00084 return true; 00085 } else 00086 return false; 00087 } 00088 00089 virtual ~GPU_Groupby_Operator() { 00090 } 00091 private: 00092 std::list<std::string> grouping_columns_; 00093 std::list<ColumnAggregation> aggregation_functions_; 00094 MaterializationStatus mat_stat_; 00095 }; 00096 00097 Physical_Operator_Map_Ptr map_init_function_groupby_operator(); 00098 TypedOperatorPtr create_CPU_Groupby_Operator(TypedLogicalNode& logical_node, const hype::SchedulingDecision&, TypedOperatorPtr left_child, TypedOperatorPtr right_child); 00099 TypedOperatorPtr create_GPU_Groupby_Operator(TypedLogicalNode& logical_node, const hype::SchedulingDecision&, TypedOperatorPtr left_child, TypedOperatorPtr right_child); 00100 00101 }//end namespace physical_operator 00102 00103 //extern Map_Init_Function init_function_Groupby_operator; 00104 00105 //Map_Init_Function init_function_Groupby_operator=physical_operator::map_init_function_groupby_operator; //boost::bind(); 00106 00107 namespace logical_operator { 00108 00109 class Logical_Groupby : public hype::queryprocessing::TypedNode_Impl<TablePtr,physical_operator::map_init_function_groupby_operator> //init_function_Groupby_operator> 00110 { 00111 public: 00112 00113 Logical_Groupby(const std::list<std::string>& grouping_columns, 00114 const std::list<ColumnAggregation>& aggregation_functions, 00115 MaterializationStatus mat_stat = MATERIALIZE, 00116 hype::DeviceConstraint dev_constr = CoGaDB::RuntimeConfiguration::instance().getGlobalDeviceConstraint()) 00117 : TypedNode_Impl<TablePtr, physical_operator::map_init_function_groupby_operator>(false, dev_constr), 00118 grouping_columns_(grouping_columns), 00119 aggregation_functions_(aggregation_functions), 00120 mat_stat_(mat_stat) { 00121 } 00122 00123 virtual unsigned int getOutputResultSize() const { 00124 return 10; 00125 } 00126 00127 virtual double getCalculatedSelectivity() const { 00128 return 0.3; 00129 } 00130 00131 virtual std::string getOperationName() const { 00132 return "GROUPBY"; 00133 } 00134 std::string toString(bool verbose) const{ 00135 std::string result="GROUPBY"; 00136 if(verbose){ 00137 result+=" ("; 00138 std::list<std::string>::const_iterator cit; 00139 for(cit=grouping_columns_.begin();cit!=grouping_columns_.end();++cit){ 00140 result+=*cit; 00141 if(cit!=--grouping_columns_.end()) 00142 result+=","; 00143 } 00144 result+=")"; 00145 result+=" USING ("; 00146 std::list<ColumnAggregation>::const_iterator agg_func_cit; 00147 for(agg_func_cit=aggregation_functions_.begin();agg_func_cit!=aggregation_functions_.end();++agg_func_cit){ 00148 result+=CoGaDB::util::getName(agg_func_cit->second); 00149 result+="("; 00150 result+=agg_func_cit->first; 00151 result+=")"; 00152 } 00153 result+=")"; 00154 } 00155 return result; 00156 00157 } 00158 const std::list<std::string>& getGroupingColumns() { 00159 return grouping_columns_; 00160 } 00161 00162 const std::list<ColumnAggregation>& getColumnAggregationFunctions() { 00163 return aggregation_functions_; 00164 } 00165 00166 const MaterializationStatus& getMaterializationStatus() const { 00167 return mat_stat_; 00168 } 00169 private: 00170 std::list<std::string> grouping_columns_; 00171 std::list<ColumnAggregation> aggregation_functions_; 00172 MaterializationStatus mat_stat_; 00173 }; 00174 00175 }//end namespace logical_operator 00176 00177 }//end namespace query_processing 00178 00179 }; //end namespace CogaDB