Column-oriented GPU-accelerated Database Management System
CoGaDB
|
00001 #pragma once 00002 #include <fstream> 00003 #include <core/compressed_column.hpp> 00004 #include <boost/archive/binary_iarchive.hpp> 00005 #include <boost/archive/binary_oarchive.hpp> 00006 #include <boost/serialization/map.hpp> 00007 00008 namespace CoGaDB { 00009 typedef unsigned int uint; 00010 typedef unsigned char uchar; 00011 00012 template<class T> 00013 class DeltaCompressedColumn : public CompressedColumn<T> { 00014 public: 00015 DeltaCompressedColumn(const std::string& name, AttributeType db_type); 00016 ~DeltaCompressedColumn(); 00017 00018 bool insert(const boost::any& new_Value); 00019 bool insert(const T& new_value); 00020 00021 template <typename InputIterator> 00022 bool insert(InputIterator first, InputIterator last); 00023 00024 bool update(TID tid, const boost::any& new_value); 00025 bool update(PositionListPtr tid, const boost::any& new_value); 00026 00027 bool remove(TID tid); 00028 //assumes tid list is sorted ascending 00029 bool remove(PositionListPtr tid); 00030 bool clearContent(); 00031 00032 const boost::any get(TID tid); 00033 void print() const throw(); 00034 size_t size() const throw(); 00035 unsigned int getSizeinBytes() const throw(); 00036 00037 const ColumnPtr copy() const; 00038 //const ColumnPtr gather(PositionListPtr tid_list); 00039 00040 bool store(const std::string& path); 00041 bool load(const std::string& path); 00042 //const ColumnPtr materialize() throw(); 00043 00044 T& operator[](const int index); 00045 00046 private: 00047 void compress_insert(uint new_value); 00048 void compress_update(TID tid, const uint& new_value_); 00049 void compress_update_part(uint& i, uint old_delta, uint new_delta); 00050 void compress_delete(TID tid); 00051 00052 uint uncompress(const uint index); 00053 uint decode(uint& index); 00054 const uint encoded_length(const uint& cvalue); 00055 00056 std::vector<uchar> cvalues_; 00057 uint size_; 00058 T hack_last_uncompressed; 00059 }; 00060 00061 template<> 00062 class DeltaCompressedColumn<std::string> : public CompressedColumn<std::string> { 00063 public: 00064 DeltaCompressedColumn(const std::string& name, AttributeType db_type); 00065 ~DeltaCompressedColumn(); 00066 00067 bool insert(const boost::any& new_Value); 00068 bool insert(const std::string& new_value); 00069 00070 template <typename InputIterator> 00071 bool insert(InputIterator first, InputIterator last); 00072 00073 bool update(TID tid, const boost::any& new_value); 00074 bool update(PositionListPtr tid, const boost::any& new_value); 00075 00076 bool remove(TID tid); 00077 //assumes tid list is sorted ascending 00078 bool remove(PositionListPtr tid); 00079 bool clearContent(); 00080 00081 const boost::any get(TID tid); 00082 void print() const throw(); 00083 size_t size() const throw(); 00084 unsigned int getSizeinBytes() const throw(); 00085 00086 const ColumnPtr copy() const; 00087 00088 bool store(const std::string& path); 00089 bool load(const std::string& path); 00090 00091 std::string& operator[](const int index); 00092 00093 private: 00094 std::string uncompress(TID tid); 00095 void compress_update(TID tid, std::string value); 00096 00097 std::vector<std::string> cvalues_; 00098 std::string hack_last_uncompressed; 00099 }; 00100 00101 00102 // ------------------------------------------------------ Implementation ------------------------------------------------------- 00103 00104 //-------------------------------------------------- String -------------------------------------------------------------------- 00105 00106 DeltaCompressedColumn<std::string>::DeltaCompressedColumn(const std::string& name, AttributeType db_type) 00107 : CompressedColumn<std::string>(name, db_type) {} 00108 00109 DeltaCompressedColumn<std::string>::~DeltaCompressedColumn() {} 00110 00111 bool DeltaCompressedColumn<std::string>::insert(const boost::any& new_value){ 00112 if (new_value.empty()) { 00113 return false; 00114 } 00115 if (typeid(std::string) == new_value.type()) { 00116 std::string value = boost::any_cast<std::string>(new_value); 00117 return insert(value); 00118 } 00119 return false; 00120 } 00121 00122 bool DeltaCompressedColumn<std::string>::insert(const std::string& new_value) { 00123 compress_update(cvalues_.size(), new_value); 00124 return true; 00125 } 00126 00127 void DeltaCompressedColumn<std::string>::compress_update(TID tid, std::string value) { 00128 std::string n_string = value; 00129 uchar count = 0; 00130 if (tid > 0) { 00131 std::string prev_string = uncompress(tid - 1); 00132 00133 for (uint i = 0; i < value.size() && i < prev_string.size(); i++) 00134 { 00135 if (value[i] == prev_string[i]) { 00136 count++; 00137 } else { 00138 break; 00139 } 00140 } 00141 n_string.erase(0, count); 00142 } 00143 n_string.push_back(count); 00144 //n_string.shrink_to_fit(); 00145 if (tid == cvalues_.size()) { 00146 cvalues_.push_back(n_string); 00147 } else { 00148 cvalues_[tid] = n_string; 00149 } 00150 } 00151 00152 std::string DeltaCompressedColumn<std::string>::uncompress(TID tid) { 00153 std::string complete_string = cvalues_[tid]; 00154 //uchar last_pre_length = complete_string.back(); 00155 uchar last_pre_length = *complete_string.rbegin(); 00156 //complete_string.pop_back(); 00157 complete_string.erase(complete_string.end()-1); 00158 for (int i = tid - 1; i >= 0; i--) 00159 { 00160 std::string last_string = cvalues_[i]; 00161 //uchar pre_length = last_string.back(); 00162 uchar pre_length = *last_string.rbegin(); 00163 if (last_pre_length > pre_length) { 00164 uchar diff = last_pre_length - pre_length; 00165 complete_string.insert(0, last_string.substr(0, diff)); 00166 last_pre_length = pre_length; 00167 } 00168 if (pre_length == 0) { 00169 break; 00170 } 00171 } 00172 return complete_string; 00173 } 00174 00175 template <typename InputIterator> 00176 bool DeltaCompressedColumn<std::string>::insert(InputIterator first, InputIterator last) { 00177 for (; first != last; ++first) { 00178 if (!insert(*first)) { 00179 return false; 00180 } 00181 } 00182 return true; 00183 } 00184 00185 const boost::any DeltaCompressedColumn<std::string>::get(TID tid){ 00186 if (tid < cvalues_.size()) 00187 return boost::any((*this)[tid]); 00188 else{ 00189 std::cout << "fatal Error!!! Invalid TID!!! Attribute: " << this->name_ << " TID: " << tid << std::endl; 00190 return boost::any(); 00191 } 00192 } 00193 00194 void DeltaCompressedColumn<std::string>::print() const throw() { 00195 std::cout << "| " << this->name_ << " |" << std::endl; 00196 std::cout << "________________________" << std::endl; 00197 } 00198 00199 size_t DeltaCompressedColumn<std::string>::size() const throw() { 00200 return cvalues_.size(); 00201 } 00202 00203 const ColumnPtr DeltaCompressedColumn<std::string>::copy() const { 00204 return ColumnPtr(new DeltaCompressedColumn<std::string>(*this)); 00205 } 00206 00207 00208 00209 bool DeltaCompressedColumn<std::string>::update(TID tid, const boost::any& new_value) { 00210 if (new_value.empty()) { 00211 return false; 00212 } 00213 if (typeid(std::string) == new_value.type()) { 00214 std::string value = boost::any_cast<std::string>(new_value); 00215 if (tid < cvalues_.size() - 1) { 00216 std::string next_string = uncompress(tid + 1); 00217 compress_update(tid, value); 00218 compress_update(tid + 1, next_string); 00219 } else { 00220 compress_update(tid, value); 00221 } 00222 return true; 00223 } else { 00224 std::cout << "Fatal Error!!! Typemismatch for column " << this->name_ << std::endl; 00225 return false; 00226 } 00227 } 00228 00229 bool DeltaCompressedColumn<std::string>::update(PositionListPtr tids, const boost::any& new_value){ 00230 if (!tids || new_value.empty()) { 00231 return false; 00232 } 00233 if(typeid(std::string) == new_value.type()) { 00234 for(unsigned int i = 0; i < tids->size(); i++) { 00235 TID tid = (*tids)[i]; 00236 if (!update(tid, new_value)) { 00237 return false; 00238 } 00239 } 00240 return true; 00241 } else { 00242 std::cout << "Fatal Error!!! Typemismatch for column " << this->name_ << std::endl; 00243 } 00244 return false; 00245 } 00246 00247 bool DeltaCompressedColumn<std::string>::remove(TID tid){ 00248 if (tid + 1 < cvalues_.size()) { 00249 std::string next_string = uncompress(tid + 1); 00250 cvalues_.erase(cvalues_.begin() + tid); 00251 compress_update(tid, next_string); 00252 } else if (tid == cvalues_.size() - 1) { 00253 cvalues_.pop_back(); 00254 } 00255 return true; 00256 } 00257 00258 bool DeltaCompressedColumn<std::string>::remove(PositionListPtr tids){ 00259 if (!tids || tids->empty()) { 00260 return false; 00261 } 00262 00263 //std::sort(tids->begin(), tids->end()); 00264 00265 for (uint i = tids->size(); i > 0; --i) { 00266 if (!remove((*tids)[i - 1])) { 00267 return false; 00268 } 00269 } 00270 00271 return true; 00272 } 00273 00274 bool DeltaCompressedColumn<std::string>::clearContent(){ 00275 cvalues_.clear(); 00276 return true; 00277 } 00278 00279 00280 bool DeltaCompressedColumn<std::string>::store(const std::string& path_) { 00281 std::string path(path_); 00282 path += "/"; 00283 path += this->name_; 00284 00285 std::ofstream outfile (path.c_str(), std::ios_base::binary | std::ios_base::out); 00286 boost::archive::binary_oarchive oa(outfile); 00287 00288 oa << cvalues_; 00289 00290 outfile.flush(); 00291 outfile.close(); 00292 return true; 00293 } 00294 00295 bool DeltaCompressedColumn<std::string>::load(const std::string& path_) { 00296 std::string path(path_); 00297 path += "/"; 00298 path += this->name_; 00299 00300 std::ifstream infile (path.c_str(), std::ios_base::binary | std::ios_base::in); 00301 boost::archive::binary_iarchive ia(infile); 00302 00303 ia >> cvalues_; 00304 00305 infile.close(); 00306 return true; 00307 } 00308 00309 std::string& DeltaCompressedColumn<std::string>::operator[](const int index) { 00310 hack_last_uncompressed = uncompress(index); 00311 return hack_last_uncompressed; 00312 } 00313 00314 00315 00316 uint DeltaCompressedColumn<std::string>::getSizeinBytes() const throw(){ 00317 uint size_in_bytes = 0; 00318 for(uint i = 0; i < cvalues_.size(); ++i){ 00319 size_in_bytes += cvalues_[i].capacity(); 00320 } 00321 return size_in_bytes; 00322 } 00323 00324 //----------------------------------------------- INT, FLOAT ---------------------------------------------------------------- 00325 00326 template<class T> 00327 DeltaCompressedColumn<T>::DeltaCompressedColumn(const std::string& name, AttributeType db_type) 00328 : CompressedColumn<T>(name, db_type) { 00329 size_ = 0; 00330 } 00331 00332 template<class T> 00333 DeltaCompressedColumn<T>::~DeltaCompressedColumn() {} 00334 00335 template<class T> 00336 bool DeltaCompressedColumn<T>::insert(const boost::any& new_value){ 00337 if (new_value.empty()) { 00338 return false; 00339 } 00340 if (typeid(T) == new_value.type()) { 00341 T value = boost::any_cast<T>(new_value); 00342 00343 uint* x = reinterpret_cast<uint*>(&value); 00344 compress_insert(*x); 00345 00346 return true; 00347 } 00348 return false; 00349 } 00350 00351 template<class T> 00352 bool DeltaCompressedColumn<T>::insert(const T& new_value) { 00353 T value = new_value; 00354 00355 uint* x = reinterpret_cast<uint*>(&value); 00356 compress_insert(*x); 00357 00358 return true; 00359 } 00360 00361 template <typename T> 00362 template <typename InputIterator> 00363 bool DeltaCompressedColumn<T>::insert(InputIterator first, InputIterator last) { 00364 for (; first != last; ++first) { 00365 if (!insert(*first)) { 00366 return false; 00367 } 00368 } 00369 return true; 00370 } 00371 00372 template<class T> 00373 const boost::any DeltaCompressedColumn<T>::get(TID tid){ 00374 if (tid < size_) { 00375 uint uncompressed = uncompress(tid); 00376 T ret = *reinterpret_cast<T*>(&uncompressed); 00377 00378 return boost::any(ret); 00379 } else { 00380 std::cout << "fatal Error!!! Invalid TID!!! Attribute: " << this->name_ << " TID: " << tid << std::endl; 00381 return boost::any(); 00382 } 00383 } 00384 00385 template<class T> 00386 void DeltaCompressedColumn<T>::print() const throw() { 00387 std::cout << "| " << this->name_ << " |" << std::endl; 00388 std::cout << "________________________" << std::endl; 00389 } 00390 00391 template<class T> 00392 size_t DeltaCompressedColumn<T>::size() const throw() { 00393 return size_; 00394 } 00395 00396 00397 template<class T> 00398 const ColumnPtr DeltaCompressedColumn<T>::copy() const { 00399 return ColumnPtr(new DeltaCompressedColumn<T>(*this)); 00400 } 00401 00402 00403 00404 template<class T> 00405 bool DeltaCompressedColumn<T>::update(TID tid, const boost::any& new_value) { 00406 if (new_value.empty()) { 00407 return false; 00408 } 00409 if (typeid(T) == new_value.type()) { 00410 T value = boost::any_cast<T>(new_value); 00411 00412 unsigned int* x = reinterpret_cast<unsigned int*>(&value); 00413 compress_update(tid, *x); 00414 00415 return true; 00416 } else { 00417 std::cout << "Fatal Error!!! Typemismatch for column " << this->name_ << std::endl; 00418 return false; 00419 } 00420 } 00421 00422 template<class T> 00423 bool DeltaCompressedColumn<T>::update(PositionListPtr tids, const boost::any& new_value){ 00424 if (!tids || new_value.empty()) { 00425 return false; 00426 } 00427 if(typeid(T) == new_value.type()) { 00428 T value = boost::any_cast<T>(new_value); 00429 uint* x = reinterpret_cast<uint*>(&value); 00430 00431 for(uint i = 0; i < tids->size(); i++) { 00432 TID tid = (*tids)[i]; 00433 00434 compress_update(tid, *x); 00435 } 00436 return true; 00437 } else { 00438 std::cout << "Fatal Error!!! Typemismatch for column " << this->name_ << std::endl; 00439 } 00440 return false; 00441 } 00442 00443 template<class T> 00444 bool DeltaCompressedColumn<T>::remove(TID tid) { 00445 compress_delete(tid); 00446 return true; 00447 } 00448 00449 template<class T> 00450 bool DeltaCompressedColumn<T>::remove(PositionListPtr tids){ 00451 if (!tids || tids->empty()) { 00452 return false; 00453 } 00454 00455 //std::sort(tids->begin(), tids->end()); 00456 00457 for (uint i = tids->size(); i > 0; --i) { 00458 compress_delete((*tids)[i - 1]); 00459 } 00460 return true; 00461 } 00462 00463 template<class T> 00464 bool DeltaCompressedColumn<T>::clearContent(){ 00465 cvalues_.clear(); 00466 size_ = 0; 00467 return true; 00468 } 00469 00470 template<class T> 00471 bool DeltaCompressedColumn<T>::store(const std::string& path_) { 00472 std::string path(path_); 00473 path += "/"; 00474 path += this->name_; 00475 00476 std::ofstream outfile (path.c_str(),std::ios_base::binary | std::ios_base::out); 00477 boost::archive::binary_oarchive oa(outfile); 00478 00479 oa << cvalues_; 00480 00481 outfile.flush(); 00482 outfile.close(); 00483 return true; 00484 } 00485 00486 template<class T> 00487 bool DeltaCompressedColumn<T>::load(const std::string& path_) { 00488 std::string path(path_); 00489 path += "/"; 00490 path += this->name_; 00491 00492 std::ifstream infile (path.c_str(),std::ios_base::binary | std::ios_base::in); 00493 boost::archive::binary_iarchive ia(infile); 00494 00495 ia >> cvalues_; 00496 00497 size_ = 0; 00498 for (uint i = 0; i < cvalues_.size();) { 00499 decode(i); 00500 size_++; 00501 } 00502 00503 infile.close(); 00504 return true; 00505 } 00506 00507 template<class T> 00508 T& DeltaCompressedColumn<T>::operator[](const int index) { 00509 uint uncompressed = uncompress(index); 00510 hack_last_uncompressed = *reinterpret_cast<T*>(&uncompressed); 00511 return hack_last_uncompressed; 00512 } 00513 00514 00515 00516 template<class T> 00517 unsigned int DeltaCompressedColumn<T>::getSizeinBytes() const throw() { 00518 // vector content + size_ variable 00519 return (cvalues_.capacity() * sizeof(unsigned char)) + sizeof(uint); 00520 } 00521 00522 template<class T> 00523 void DeltaCompressedColumn<T>::compress_insert(uint new_value) { 00524 uint delta; 00525 if (cvalues_.empty()) { 00526 delta = new_value; 00527 } else { 00528 // determine delta 00529 delta = new_value ^ uncompress(size_ - 1); 00530 } 00531 // pass delta to char vector 00532 while (delta > 127) { 00533 cvalues_.push_back(128 | (delta & 127)); 00534 delta >>= 7; 00535 } 00536 cvalues_.push_back(255 & delta); 00537 size_++; 00538 } 00539 00540 template<class T> 00541 void DeltaCompressedColumn<T>::compress_update(TID tid, const uint& new_value_) { 00542 if (tid < 0 || tid >= size_) { 00543 throw std::out_of_range("error"); 00544 } 00545 uint i = 0; 00546 00547 uint old_value = decode(i); 00548 uint old_delta; 00549 uint new_delta; 00550 00551 if (tid > 0) { 00552 uint prev_value = old_value; 00553 for (uint j = 1; j < tid; j++) { 00554 old_delta = decode(i); 00555 prev_value ^= old_delta; 00556 } 00557 uint tempi = i; 00558 old_delta = decode(tempi); 00559 00560 old_value = prev_value ^ old_delta; 00561 new_delta = prev_value ^ new_value_; 00562 00563 compress_update_part(i, old_delta, new_delta); 00564 } else { 00565 uint tempi = 0; 00566 compress_update_part(tempi, old_value, new_value_); 00567 } 00568 00569 if (i < cvalues_.size()) { 00570 uint tempi = i; 00571 old_delta = decode(tempi); 00572 new_delta = new_value_ ^ (old_value ^ old_delta); 00573 00574 compress_update_part(i, old_delta, new_delta); 00575 } 00576 } 00577 00578 template<class T> 00579 void DeltaCompressedColumn<T>::compress_delete(TID tid) { 00580 if (tid < 0 || tid >= size_) { 00581 throw std::out_of_range("error"); 00582 } 00583 00584 if (tid == size_ - 1) { 00585 cvalues_.pop_back(); 00586 if (cvalues_.size() > 0) { 00587 while (cvalues_.back() >= 128) { 00588 cvalues_.pop_back(); 00589 } 00590 } 00591 } else { 00592 uint i = 0; 00593 for (uint j = 0; j < tid; j++) { 00594 decode(i); 00595 } 00596 uint tempi = i; 00597 uint old_delta = decode(tempi); 00598 uint old_delta2 = decode(tempi); 00599 00600 uint new_delta = old_delta ^ old_delta2; 00601 00602 uint old_length = encoded_length(old_delta); 00603 cvalues_.erase(cvalues_.begin() + i, cvalues_.begin() + i + old_length); 00604 00605 if (i < cvalues_.size()) { 00606 compress_update_part(i, old_delta2, new_delta); 00607 } 00608 } 00609 size_--; 00610 } 00611 00612 template<class T> 00613 void DeltaCompressedColumn<T>::compress_update_part(uint& i, uint old_delta, uint new_delta) { 00614 uint old_length = encoded_length(old_delta); 00615 uint new_length = encoded_length(new_delta); 00616 00617 int delta_length = new_length - old_length; 00618 if (delta_length != 0) { 00619 if (delta_length < 0) { 00620 cvalues_.erase(cvalues_.begin() + i, cvalues_.begin() + i - delta_length); 00621 } else { 00622 cvalues_.insert(cvalues_.begin() + i, delta_length, 0); 00623 } 00624 } 00625 00626 while (new_delta > 127) { 00627 cvalues_[i++] = (128 | (new_delta & 127)); 00628 new_delta >>= 7; 00629 } 00630 cvalues_[i++] = (255 & new_delta); 00631 } 00632 00633 template<class T> 00634 uint DeltaCompressedColumn<T>::uncompress(const uint index) { 00635 if (index < 0 || index >= size_) { 00636 throw std::out_of_range("error"); 00637 } 00638 00639 uint i = 0; 00640 uint result = decode(i); 00641 for (uint j = 1; j <= index; j++) { 00642 uint delta = decode(i); 00643 result ^= delta; 00644 } 00645 00646 return result; 00647 } 00648 00649 template<class T> 00650 uint DeltaCompressedColumn<T>::decode(uint& index) { 00651 uint result = 0; 00652 int shift = 0; 00653 00654 uchar temp; 00655 do { 00656 temp = cvalues_[index++]; 00657 result += (temp & 127) << shift; 00658 shift +=7; 00659 } while (temp > 127); 00660 00661 return result; 00662 } 00663 00664 template<class T> 00665 const uint DeltaCompressedColumn<T>::encoded_length(const uint& cvalue) { 00666 uint val = cvalue; 00667 uint length = 0; 00668 do { 00669 val >>= 7; 00670 length++; 00671 } while (val > 0); 00672 return length; 00673 } 00674 00675 };