Column-oriented GPU-accelerated Database Management System
CoGaDB
|
00001 #pragma once 00002 #include <fstream> 00003 #include <core\compressed_column.hpp> 00004 #include <boost\archive\binary_iarchive.hpp> 00005 #include <boost\archive\binary_oarchive.hpp> 00006 #include <boost\serialization\map.hpp> 00007 00008 namespace CoGaDB { 00009 typedef unsigned int uint; 00010 typedef unsigned char uchar; 00011 00012 template<class T> 00013 class DeltaCompressedColumn : public CompressedColumn<T> { 00014 public: 00015 DeltaCompressedColumn(const std::string& name, AttributeType db_type); 00016 ~DeltaCompressedColumn(); 00017 00018 bool insert(const boost::any& new_Value); 00019 bool insert(const T& new_value); 00020 00021 template <typename InputIterator> 00022 bool insert(InputIterator first, InputIterator last); 00023 00024 bool update(TID tid, const boost::any& new_value); 00025 bool update(PositionListPtr tid, const boost::any& new_value); 00026 00027 bool remove(TID tid); 00028 //assumes tid list is sorted ascending 00029 bool remove(PositionListPtr tid); 00030 bool clearContent(); 00031 00032 const boost::any get(TID tid); 00033 void print() const throw(); 00034 size_t size() const throw(); 00035 unsigned int getSizeinBytes() const throw(); 00036 00037 const ColumnPtr copy() const; 00038 00039 bool store(const std::string& path); 00040 bool load(const std::string& path); 00041 00042 T& operator[](const int index); 00043 00044 private: 00045 void compress_insert(uint new_value); 00046 void compress_update(TID tid, const uint& new_value_); 00047 void compress_update_part(uint& i, uint old_delta, uint new_delta); 00048 void compress_delete(TID tid); 00049 00050 uint uncompress(const uint index); 00051 uint decode(uint& index); 00052 const uint encoded_length(const uint& cvalue); 00053 00054 std::vector<uchar> cvalues_; 00055 uint size_; 00056 }; 00057 00058 template<> 00059 class DeltaCompressedColumn<std::string> : public CompressedColumn<std::string> { 00060 public: 00061 DeltaCompressedColumn(const std::string& name, AttributeType db_type); 00062 ~DeltaCompressedColumn(); 00063 00064 bool insert(const boost::any& new_Value); 00065 bool insert(const std::string& new_value); 00066 00067 template <typename InputIterator> 00068 bool insert(InputIterator first, InputIterator last); 00069 00070 bool update(TID tid, const boost::any& new_value); 00071 bool update(PositionListPtr tid, const boost::any& new_value); 00072 00073 bool remove(TID tid); 00074 //assumes tid list is sorted ascending 00075 bool remove(PositionListPtr tid); 00076 bool clearContent(); 00077 00078 const boost::any get(TID tid); 00079 void print() const throw(); 00080 size_t size() const throw(); 00081 unsigned int getSizeinBytes() const throw(); 00082 00083 const ColumnPtr copy() const; 00084 00085 bool store(const std::string& path); 00086 bool load(const std::string& path); 00087 00088 std::string& operator[](const int index); 00089 00090 private: 00091 std::string uncompress(TID tid); 00092 void compress_update(TID tid, std::string value); 00093 00094 std::vector<std::string> cvalues_; 00095 std::string hack_last_uncompressed; 00096 }; 00097 00098 00099 // ------------------------------------------------------ Implementation ------------------------------------------------------- 00100 00101 //-------------------------------------------------- String -------------------------------------------------------------------- 00102 00103 DeltaCompressedColumn<std::string>::DeltaCompressedColumn(const std::string& name, AttributeType db_type) 00104 : CompressedColumn<std::string>(name, db_type) {} 00105 00106 DeltaCompressedColumn<std::string>::~DeltaCompressedColumn() {} 00107 00108 bool DeltaCompressedColumn<std::string>::insert(const boost::any& new_value){ 00109 if (new_value.empty()) { 00110 return false; 00111 } 00112 if (typeid(std::string) == new_value.type()) { 00113 std::string value = boost::any_cast<std::string>(new_value); 00114 return insert(value); 00115 } 00116 return false; 00117 } 00118 00119 bool DeltaCompressedColumn<std::string>::insert(const std::string& new_value) { 00120 compress_update(cvalues_.size(), new_value); 00121 return true; 00122 } 00123 00124 void DeltaCompressedColumn<std::string>::compress_update(TID tid, std::string value) { 00125 std::string n_string = value; 00126 uchar count = 0; 00127 if (tid > 0) { 00128 std::string prev_string = uncompress(tid - 1); 00129 00130 for (uint i = 0; i < value.size() && i < prev_string.size(); i++) 00131 { 00132 if (value[i] == prev_string[i]) { 00133 count++; 00134 } else { 00135 break; 00136 } 00137 } 00138 n_string.erase(0, count); 00139 } 00140 n_string.push_back(count); 00141 n_string.shrink_to_fit(); 00142 if (tid == cvalues_.size()) { 00143 cvalues_.push_back(n_string); 00144 } else { 00145 cvalues_[tid] = n_string; 00146 } 00147 } 00148 00149 std::string DeltaCompressedColumn<std::string>::uncompress(TID tid) { 00150 std::string complete_string = cvalues_[tid]; 00151 uchar last_pre_length = complete_string.back(); 00152 complete_string.pop_back(); 00153 for (int i = tid - 1; i >= 0; i--) 00154 { 00155 std::string last_string = cvalues_[i]; 00156 uchar pre_length = last_string.back(); 00157 if (last_pre_length > pre_length) { 00158 uchar diff = last_pre_length - pre_length; 00159 complete_string.insert(0, last_string.substr(0, diff)); 00160 last_pre_length = pre_length; 00161 } 00162 if (pre_length == 0) { 00163 break; 00164 } 00165 } 00166 return complete_string; 00167 } 00168 00169 template <typename InputIterator> 00170 bool DeltaCompressedColumn<std::string>::insert(InputIterator first, InputIterator last) { 00171 for (; first != last; ++first) { 00172 if (!insert(*first)) { 00173 return false; 00174 } 00175 } 00176 return true; 00177 } 00178 00179 const boost::any DeltaCompressedColumn<std::string>::get(TID tid){ 00180 if (tid < cvalues_.size()) 00181 return boost::any((*this)[tid]); 00182 else{ 00183 std::cout << "fatal Error!!! Invalid TID!!! Attribute: " << this->name_ << " TID: " << tid << std::endl; 00184 return boost::any(); 00185 } 00186 } 00187 00188 void DeltaCompressedColumn<std::string>::print() const throw() { 00189 std::cout << "| " << this->name_ << " |" << std::endl; 00190 std::cout << "________________________" << std::endl; 00191 } 00192 00193 size_t DeltaCompressedColumn<std::string>::size() const throw() { 00194 return cvalues_.size(); 00195 } 00196 00197 const ColumnPtr DeltaCompressedColumn<std::string>::copy() const { 00198 return ColumnPtr(new DeltaCompressedColumn<std::string>(*this)); 00199 } 00200 00201 bool DeltaCompressedColumn<std::string>::update(TID tid, const boost::any& new_value) { 00202 if (new_value.empty()) { 00203 return false; 00204 } 00205 if (typeid(std::string) == new_value.type()) { 00206 std::string value = boost::any_cast<std::string>(new_value); 00207 if (tid < cvalues_.size() - 1) { 00208 std::string next_string = uncompress(tid + 1); 00209 compress_update(tid, value); 00210 compress_update(tid + 1, next_string); 00211 } else { 00212 compress_update(tid, value); 00213 } 00214 return true; 00215 } else { 00216 std::cout << "Fatal Error!!! Typemismatch for column " << this->name_ << std::endl; 00217 return false; 00218 } 00219 } 00220 00221 bool DeltaCompressedColumn<std::string>::update(PositionListPtr tids, const boost::any& new_value){ 00222 if (!tids || new_value.empty()) { 00223 return false; 00224 } 00225 if(typeid(std::string) == new_value.type()) { 00226 for(unsigned int i = 0; i < tids->size(); i++) { 00227 TID tid = (*tids)[i]; 00228 if (!update(tid, new_value)) { 00229 return false; 00230 } 00231 } 00232 return true; 00233 } else { 00234 std::cout << "Fatal Error!!! Typemismatch for column " << this->name_ << std::endl; 00235 } 00236 return false; 00237 } 00238 00239 bool DeltaCompressedColumn<std::string>::remove(TID tid){ 00240 if (tid + 1 < cvalues_.size()) { 00241 std::string next_string = uncompress(tid + 1); 00242 cvalues_.erase(cvalues_.begin() + tid); 00243 compress_update(tid, next_string); 00244 } else if (tid == cvalues_.size() - 1) { 00245 cvalues_.pop_back(); 00246 } 00247 return true; 00248 } 00249 00250 bool DeltaCompressedColumn<std::string>::remove(PositionListPtr tids){ 00251 if (!tids || tids->empty()) { 00252 return false; 00253 } 00254 00255 //std::sort(tids->begin(), tids->end()); 00256 00257 for (uint i = tids->size(); i > 0; --i) { 00258 if (!remove((*tids)[i - 1])) { 00259 return false; 00260 } 00261 } 00262 00263 return true; 00264 } 00265 00266 bool DeltaCompressedColumn<std::string>::clearContent(){ 00267 cvalues_.clear(); 00268 return true; 00269 } 00270 00271 00272 bool DeltaCompressedColumn<std::string>::store(const std::string& path_) { 00273 std::string path(path_); 00274 path += "/"; 00275 path += this->name_; 00276 00277 std::ofstream outfile (path.c_str(), std::ios_base::binary | std::ios_base::out); 00278 boost::archive::binary_oarchive oa(outfile); 00279 00280 oa << cvalues_; 00281 00282 outfile.flush(); 00283 outfile.close(); 00284 return true; 00285 } 00286 00287 bool DeltaCompressedColumn<std::string>::load(const std::string& path_) { 00288 std::string path(path_); 00289 path += "/"; 00290 path += this->name_; 00291 00292 std::ifstream infile (path.c_str(), std::ios_base::binary | std::ios_base::in); 00293 boost::archive::binary_iarchive ia(infile); 00294 00295 ia >> cvalues_; 00296 00297 infile.close(); 00298 return true; 00299 } 00300 00301 std::string& DeltaCompressedColumn<std::string>::operator[](const int index) { 00302 hack_last_uncompressed = uncompress(index); 00303 return hack_last_uncompressed; 00304 } 00305 00306 uint DeltaCompressedColumn<std::string>::getSizeinBytes() const throw(){ 00307 uint size_in_bytes = 0; 00308 for(uint i = 0; i < cvalues_.size(); ++i){ 00309 size_in_bytes += cvalues_[i].capacity(); 00310 } 00311 return size_in_bytes; 00312 } 00313 00314 //----------------------------------------------- INT, FLOAT ---------------------------------------------------------------- 00315 00316 template<class T> 00317 DeltaCompressedColumn<T>::DeltaCompressedColumn(const std::string& name, AttributeType db_type) 00318 : CompressedColumn<T>(name, db_type) { 00319 size_ = 0; 00320 } 00321 00322 template<class T> 00323 DeltaCompressedColumn<T>::~DeltaCompressedColumn() {} 00324 00325 template<class T> 00326 bool DeltaCompressedColumn<T>::insert(const boost::any& new_value){ 00327 if (new_value.empty()) { 00328 return false; 00329 } 00330 if (typeid(T) == new_value.type()) { 00331 T value = boost::any_cast<T>(new_value); 00332 00333 uint* x = reinterpret_cast<uint*>(&value); 00334 compress_insert(*x); 00335 00336 return true; 00337 } 00338 return false; 00339 } 00340 00341 template<class T> 00342 bool DeltaCompressedColumn<T>::insert(const T& new_value) { 00343 T value = new_value; 00344 00345 uint* x = reinterpret_cast<uint*>(&value); 00346 compress_insert(*x); 00347 00348 return true; 00349 } 00350 00351 template <typename T> 00352 template <typename InputIterator> 00353 bool DeltaCompressedColumn<T>::insert(InputIterator first, InputIterator last) { 00354 for (; first != last; ++first) { 00355 if (!insert(*first)) { 00356 return false; 00357 } 00358 } 00359 return true; 00360 } 00361 00362 template<class T> 00363 const boost::any DeltaCompressedColumn<T>::get(TID tid){ 00364 if (tid < size_) { 00365 uint uncompressed = uncompress(tid); 00366 T ret = *reinterpret_cast<T*>(&uncompressed); 00367 00368 return boost::any(ret); 00369 } else { 00370 std::cout << "fatal Error!!! Invalid TID!!! Attribute: " << this->name_ << " TID: " << tid << std::endl; 00371 return boost::any(); 00372 } 00373 } 00374 00375 template<class T> 00376 void DeltaCompressedColumn<T>::print() const throw() { 00377 std::cout << "| " << this->name_ << " |" << std::endl; 00378 std::cout << "________________________" << std::endl; 00379 } 00380 00381 template<class T> 00382 size_t DeltaCompressedColumn<T>::size() const throw() { 00383 return size_; 00384 } 00385 00386 00387 template<class T> 00388 const ColumnPtr DeltaCompressedColumn<T>::copy() const { 00389 return ColumnPtr(new DeltaCompressedColumn<T>(*this)); 00390 } 00391 00392 template<class T> 00393 bool DeltaCompressedColumn<T>::update(TID tid, const boost::any& new_value) { 00394 if (new_value.empty()) { 00395 return false; 00396 } 00397 if (typeid(T) == new_value.type()) { 00398 T value = boost::any_cast<T>(new_value); 00399 00400 unsigned int* x = reinterpret_cast<unsigned int*>(&value); 00401 compress_update(tid, *x); 00402 00403 return true; 00404 } else { 00405 std::cout << "Fatal Error!!! Typemismatch for column " << this->name_ << std::endl; 00406 return false; 00407 } 00408 } 00409 00410 template<class T> 00411 bool DeltaCompressedColumn<T>::update(PositionListPtr tids, const boost::any& new_value){ 00412 if (!tids || new_value.empty()) { 00413 return false; 00414 } 00415 if(typeid(T) == new_value.type()) { 00416 T value = boost::any_cast<T>(new_value); 00417 uint* x = reinterpret_cast<uint*>(&value); 00418 00419 for(uint i = 0; i < tids->size(); i++) { 00420 TID tid = (*tids)[i]; 00421 00422 compress_update(tid, *x); 00423 } 00424 return true; 00425 } else { 00426 std::cout << "Fatal Error!!! Typemismatch for column " << this->name_ << std::endl; 00427 } 00428 return false; 00429 } 00430 00431 template<class T> 00432 bool DeltaCompressedColumn<T>::remove(TID tid) { 00433 compress_delete(tid); 00434 return true; 00435 } 00436 00437 template<class T> 00438 bool DeltaCompressedColumn<T>::remove(PositionListPtr tids){ 00439 if (!tids || tids->empty()) { 00440 return false; 00441 } 00442 00443 //std::sort(tids->begin(), tids->end()); 00444 00445 for (uint i = tids->size(); i > 0; --i) { 00446 compress_delete((*tids)[i - 1]); 00447 } 00448 return true; 00449 } 00450 00451 template<class T> 00452 bool DeltaCompressedColumn<T>::clearContent(){ 00453 cvalues_.clear(); 00454 size_ = 0; 00455 return true; 00456 } 00457 00458 template<class T> 00459 bool DeltaCompressedColumn<T>::store(const std::string& path_) { 00460 std::string path(path_); 00461 path += "/"; 00462 path += this->name_; 00463 00464 std::ofstream outfile (path.c_str(),std::ios_base::binary | std::ios_base::out); 00465 boost::archive::binary_oarchive oa(outfile); 00466 00467 oa << cvalues_; 00468 00469 outfile.flush(); 00470 outfile.close(); 00471 return true; 00472 } 00473 00474 template<class T> 00475 bool DeltaCompressedColumn<T>::load(const std::string& path_) { 00476 std::string path(path_); 00477 path += "/"; 00478 path += this->name_; 00479 00480 std::ifstream infile (path.c_str(),std::ios_base::binary | std::ios_base::in); 00481 boost::archive::binary_iarchive ia(infile); 00482 00483 ia >> cvalues_; 00484 00485 size_ = 0; 00486 for (uint i = 0; i < cvalues_.size();) { 00487 decode(i); 00488 size_++; 00489 } 00490 00491 infile.close(); 00492 return true; 00493 } 00494 00495 template<class T> 00496 T& DeltaCompressedColumn<T>::operator[](const int index) { 00497 uint uncompressed = uncompress(index); 00498 return *reinterpret_cast<T*>(&uncompressed); 00499 } 00500 00501 template<class T> 00502 unsigned int DeltaCompressedColumn<T>::getSizeinBytes() const throw() { 00503 // vector content + size_ variable 00504 return (cvalues_.capacity() * sizeof(unsigned char)) + sizeof(uint); 00505 } 00506 00507 template<class T> 00508 void DeltaCompressedColumn<T>::compress_insert(uint new_value) { 00509 uint delta; 00510 if (cvalues_.empty()) { 00511 delta = new_value; 00512 } else { 00513 // determine delta 00514 delta = new_value ^ uncompress(size_ - 1); 00515 } 00516 // pass delta to char vector 00517 while (delta > 127) { 00518 cvalues_.push_back(128 | (delta & 127)); 00519 delta >>= 7; 00520 } 00521 cvalues_.push_back(255 & delta); 00522 size_++; 00523 } 00524 00525 template<class T> 00526 void DeltaCompressedColumn<T>::compress_update(TID tid, const uint& new_value_) { 00527 if (tid < 0 || tid >= size_) { 00528 throw std::out_of_range("error"); 00529 } 00530 uint i = 0; 00531 00532 uint old_value = decode(i); 00533 uint old_delta; 00534 uint new_delta; 00535 00536 if (tid > 0) { 00537 uint prev_value = old_value; 00538 for (uint j = 1; j < tid; j++) { 00539 old_delta = decode(i); 00540 prev_value ^= old_delta; 00541 } 00542 uint tempi = i; 00543 old_delta = decode(tempi); 00544 00545 old_value = prev_value ^ old_delta; 00546 new_delta = prev_value ^ new_value_; 00547 00548 compress_update_part(i, old_delta, new_delta); 00549 } else { 00550 uint tempi = 0; 00551 compress_update_part(tempi, old_value, new_value_); 00552 } 00553 00554 if (i < cvalues_.size()) { 00555 uint tempi = i; 00556 old_delta = decode(tempi); 00557 new_delta = new_value_ ^ (old_value ^ old_delta); 00558 00559 compress_update_part(i, old_delta, new_delta); 00560 } 00561 } 00562 00563 template<class T> 00564 void DeltaCompressedColumn<T>::compress_delete(TID tid) { 00565 if (tid < 0 || tid >= size_) { 00566 throw std::out_of_range("error"); 00567 } 00568 00569 if (tid == size_ - 1) { 00570 cvalues_.pop_back(); 00571 if (cvalues_.size() > 0) { 00572 while (cvalues_.back() >= 128) { 00573 cvalues_.pop_back(); 00574 } 00575 } 00576 } else { 00577 uint i = 0; 00578 for (uint j = 0; j < tid; j++) { 00579 decode(i); 00580 } 00581 uint tempi = i; 00582 uint old_delta = decode(tempi); 00583 uint old_delta2 = decode(tempi); 00584 00585 uint new_delta = old_delta ^ old_delta2; 00586 00587 uint old_length = encoded_length(old_delta); 00588 cvalues_.erase(cvalues_.begin() + i, cvalues_.begin() + i + old_length); 00589 00590 if (i < cvalues_.size()) { 00591 compress_update_part(i, old_delta2, new_delta); 00592 } 00593 } 00594 size_--; 00595 } 00596 00597 template<class T> 00598 void DeltaCompressedColumn<T>::compress_update_part(uint& i, uint old_delta, uint new_delta) { 00599 uint old_length = encoded_length(old_delta); 00600 uint new_length = encoded_length(new_delta); 00601 00602 int delta_length = new_length - old_length; 00603 if (delta_length != 0) { 00604 if (delta_length < 0) { 00605 cvalues_.erase(cvalues_.begin() + i, cvalues_.begin() + i - delta_length); 00606 } else { 00607 cvalues_.insert(cvalues_.begin() + i, delta_length, 0); 00608 } 00609 } 00610 00611 while (new_delta > 127) { 00612 cvalues_[i++] = (128 | (new_delta & 127)); 00613 new_delta >>= 7; 00614 } 00615 cvalues_[i++] = (255 & new_delta); 00616 } 00617 00618 template<class T> 00619 uint DeltaCompressedColumn<T>::uncompress(const uint index) { 00620 if (index < 0 || index >= size_) { 00621 throw std::out_of_range("error"); 00622 } 00623 00624 uint i = 0; 00625 uint result = decode(i); 00626 for (uint j = 1; j <= index; j++) { 00627 uint delta = decode(i); 00628 result ^= delta; 00629 } 00630 00631 return result; 00632 } 00633 00634 template<class T> 00635 uint DeltaCompressedColumn<T>::decode(uint& index) { 00636 uint result = 0; 00637 int shift = 0; 00638 00639 uchar temp; 00640 do { 00641 temp = cvalues_[index++]; 00642 result += (temp & 127) << shift; 00643 shift +=7; 00644 } while (temp > 127); 00645 00646 return result; 00647 } 00648 00649 template<class T> 00650 const uint DeltaCompressedColumn<T>::encoded_length(const uint& cvalue) { 00651 uint val = cvalue; 00652 uint length = 0; 00653 do { 00654 val >>= 7; 00655 length++; 00656 } while (val > 0); 00657 return length; 00658 } 00659 00660 };