Column-oriented GPU-accelerated Database Management System
CoGaDB
/home/sebastian/gpudbms/trunk/cogadb/include/compression/delta_compressed_column.hpp
Go to the documentation of this file.
00001 #pragma once
00002 #include <fstream>
00003 #include <core/compressed_column.hpp>
00004 #include <boost/archive/binary_iarchive.hpp>
00005 #include <boost/archive/binary_oarchive.hpp>
00006 #include <boost/serialization/map.hpp>
00007 
00008 namespace CoGaDB {
00009         typedef unsigned int uint;
00010         typedef unsigned char uchar;
00011 
00012         template<class T>
00013         class DeltaCompressedColumn : public CompressedColumn<T> {
00014                 public:
00015                         DeltaCompressedColumn(const std::string& name, AttributeType db_type);
00016                         ~DeltaCompressedColumn();
00017 
00018                         bool insert(const boost::any& new_Value);
00019                         bool insert(const T& new_value);
00020                         
00021                         template <typename InputIterator>
00022                         bool insert(InputIterator first, InputIterator last);
00023 
00024                         bool update(TID tid, const boost::any& new_value);
00025                         bool update(PositionListPtr tid, const boost::any& new_value);  
00026         
00027                         bool remove(TID tid);
00028                         //assumes tid list is sorted ascending
00029                         bool remove(PositionListPtr tid);
00030                         bool clearContent();
00031 
00032                         const boost::any get(TID tid);
00033                         void print() const throw();
00034                         size_t size() const throw();
00035                         unsigned int getSizeinBytes() const throw();
00036 
00037                         const ColumnPtr copy() const;
00038                         //const ColumnPtr gather(PositionListPtr tid_list);
00039         
00040                         bool store(const std::string& path);
00041                         bool load(const std::string& path);
00042                         //const ColumnPtr materialize() throw();
00043         
00044                         T& operator[](const int index);
00045 
00046                 private:
00047                         void compress_insert(uint new_value);
00048                         void compress_update(TID tid, const uint& new_value_);
00049                         void compress_update_part(uint& i, uint old_delta, uint new_delta);
00050                         void compress_delete(TID tid);
00051 
00052                         uint uncompress(const uint index);
00053                         uint decode(uint& index);
00054                         const uint encoded_length(const uint& cvalue);
00055 
00056                         std::vector<uchar> cvalues_;
00057                         uint size_;
00058                         T hack_last_uncompressed;
00059         };
00060 
00061         template<>
00062         class DeltaCompressedColumn<std::string> : public CompressedColumn<std::string> {
00063                 public:
00064                         DeltaCompressedColumn(const std::string& name, AttributeType db_type);
00065                         ~DeltaCompressedColumn();
00066 
00067                         bool insert(const boost::any& new_Value);
00068                         bool insert(const std::string& new_value);
00069                         
00070                         template <typename InputIterator>
00071                         bool insert(InputIterator first, InputIterator last);
00072 
00073                         bool update(TID tid, const boost::any& new_value);
00074                         bool update(PositionListPtr tid, const boost::any& new_value);  
00075         
00076                         bool remove(TID tid);
00077                         //assumes tid list is sorted ascending
00078                         bool remove(PositionListPtr tid);
00079                         bool clearContent();
00080 
00081                         const boost::any get(TID tid);
00082                         void print() const throw();
00083                         size_t size() const throw();
00084                         unsigned int getSizeinBytes() const throw();
00085 
00086                         const ColumnPtr copy() const;
00087 
00088                         bool store(const std::string& path);
00089                         bool load(const std::string& path);
00090         
00091                         std::string& operator[](const int index);
00092 
00093                 private:
00094                         std::string uncompress(TID tid);
00095                         void compress_update(TID tid, std::string value);
00096 
00097                         std::vector<std::string> cvalues_;
00098                         std::string hack_last_uncompressed;
00099         };
00100 
00101 
00102         // ------------------------------------------------------ Implementation -------------------------------------------------------
00103         
00104         //-------------------------------------------------- String --------------------------------------------------------------------
00105                 
00106         DeltaCompressedColumn<std::string>::DeltaCompressedColumn(const std::string& name, AttributeType db_type) 
00107                 : CompressedColumn<std::string>(name, db_type) {}
00108 
00109         DeltaCompressedColumn<std::string>::~DeltaCompressedColumn() {}
00110 
00111         bool DeltaCompressedColumn<std::string>::insert(const boost::any& new_value){
00112                 if (new_value.empty()) {
00113                         return false;
00114                 }
00115                 if (typeid(std::string) == new_value.type()) {
00116                         std::string value = boost::any_cast<std::string>(new_value);
00117                         return insert(value);
00118                 }
00119                 return false;
00120         }
00121 
00122         bool DeltaCompressedColumn<std::string>::insert(const std::string& new_value) {
00123                 compress_update(cvalues_.size(), new_value);
00124                 return true;
00125         }
00126 
00127         void DeltaCompressedColumn<std::string>::compress_update(TID tid, std::string value) {
00128                 std::string n_string = value;
00129                 uchar count = 0;
00130                 if (tid > 0) {
00131                         std::string prev_string = uncompress(tid - 1);
00132                 
00133                         for (uint i = 0; i < value.size() && i < prev_string.size(); i++)
00134                         {
00135                                 if (value[i] == prev_string[i]) {
00136                                         count++;
00137                                 } else {
00138                                         break;
00139                                 }
00140                         }
00141                         n_string.erase(0, count);
00142                 }
00143                 n_string.push_back(count);
00144                 //n_string.shrink_to_fit();
00145                 if (tid == cvalues_.size()) {
00146                         cvalues_.push_back(n_string);
00147                 } else {
00148                         cvalues_[tid] = n_string;
00149                 }
00150         }
00151 
00152         std::string DeltaCompressedColumn<std::string>::uncompress(TID tid) {
00153                 std::string complete_string = cvalues_[tid];
00154                 //uchar last_pre_length = complete_string.back();
00155                 uchar last_pre_length = *complete_string.rbegin();
00156                 //complete_string.pop_back();
00157                 complete_string.erase(complete_string.end()-1);
00158                 for (int i = tid - 1; i >= 0; i--)
00159                 {
00160                         std::string last_string = cvalues_[i];
00161                         //uchar pre_length = last_string.back();
00162                         uchar pre_length = *last_string.rbegin();
00163                         if (last_pre_length > pre_length) {
00164                                 uchar diff = last_pre_length - pre_length;
00165                                 complete_string.insert(0, last_string.substr(0, diff));
00166                                 last_pre_length = pre_length;           
00167                         }
00168                         if (pre_length == 0) { 
00169                                 break;
00170                         }
00171                 }
00172                 return complete_string;
00173         }
00174 
00175         template <typename InputIterator>
00176         bool DeltaCompressedColumn<std::string>::insert(InputIterator first, InputIterator last) {
00177                 for (; first != last; ++first) {
00178                         if (!insert(*first)) {
00179                                 return false;
00180                         }
00181                 }
00182                 return true;
00183         }
00184 
00185         const boost::any DeltaCompressedColumn<std::string>::get(TID tid){
00186                 if (tid < cvalues_.size())
00187                         return boost::any((*this)[tid]);
00188                 else{
00189                         std::cout << "fatal Error!!! Invalid TID!!! Attribute: " << this->name_ << " TID: " << tid  << std::endl;
00190                         return boost::any();
00191                 }
00192         }
00193 
00194         void DeltaCompressedColumn<std::string>::print() const throw() {
00195                 std::cout << "| " << this->name_ << " |" << std::endl;
00196                 std::cout << "________________________" << std::endl;
00197         }
00198 
00199         size_t DeltaCompressedColumn<std::string>::size() const throw() {
00200                 return cvalues_.size();
00201         }
00202 
00203         const ColumnPtr DeltaCompressedColumn<std::string>::copy() const {
00204                 return ColumnPtr(new DeltaCompressedColumn<std::string>(*this));
00205         }
00206 
00207 
00208         
00209         bool DeltaCompressedColumn<std::string>::update(TID tid, const boost::any& new_value) {
00210                 if (new_value.empty()) {
00211                         return false;
00212                 }
00213                 if (typeid(std::string) == new_value.type()) {
00214                          std::string value = boost::any_cast<std::string>(new_value);
00215                          if (tid < cvalues_.size() - 1) {
00216                                 std::string next_string = uncompress(tid + 1);
00217                                 compress_update(tid, value);
00218                                 compress_update(tid + 1, next_string);
00219                          } else {
00220                                 compress_update(tid, value);
00221                          }
00222                          return true;
00223                 } else {
00224                         std::cout << "Fatal Error!!! Typemismatch for column " << this->name_ << std::endl; 
00225                         return false;
00226                 }
00227         }
00228 
00229         bool DeltaCompressedColumn<std::string>::update(PositionListPtr tids, const boost::any& new_value){
00230                 if (!tids || new_value.empty()) {
00231                         return false;
00232                 }
00233                 if(typeid(std::string) == new_value.type()) {
00234                         for(unsigned int i = 0; i < tids->size(); i++) {
00235                                 TID tid = (*tids)[i];
00236                                 if (!update(tid, new_value)) {
00237                                         return false;
00238                                 }
00239                         }
00240                         return true;
00241                 } else {
00242                         std::cout << "Fatal Error!!! Typemismatch for column " << this->name_ << std::endl; 
00243                 }
00244                 return false;   
00245         }
00246         
00247         bool DeltaCompressedColumn<std::string>::remove(TID tid){
00248                 if (tid + 1 < cvalues_.size()) {
00249                         std::string next_string = uncompress(tid + 1);
00250                         cvalues_.erase(cvalues_.begin() + tid);
00251                         compress_update(tid, next_string);
00252                 } else if (tid == cvalues_.size() - 1) {
00253                         cvalues_.pop_back();
00254                 }
00255                 return true;
00256         }
00257         
00258         bool DeltaCompressedColumn<std::string>::remove(PositionListPtr tids){
00259                 if (!tids || tids->empty()) {
00260                         return false;
00261                 }
00262 
00263                 //std::sort(tids->begin(), tids->end());
00264 
00265                 for (uint i = tids->size(); i > 0; --i) {
00266                         if (!remove((*tids)[i - 1])) {
00267                                 return false;
00268                         }
00269                 }
00270 
00271                 return true;                    
00272         }
00273 
00274         bool DeltaCompressedColumn<std::string>::clearContent(){
00275                 cvalues_.clear();
00276                 return true;
00277         }
00278 
00279 
00280         bool DeltaCompressedColumn<std::string>::store(const std::string& path_) {
00281                 std::string path(path_);
00282                 path += "/";
00283                 path += this->name_;
00284                 
00285                 std::ofstream outfile (path.c_str(), std::ios_base::binary | std::ios_base::out);
00286                 boost::archive::binary_oarchive oa(outfile);
00287 
00288                 oa << cvalues_;
00289                 
00290                 outfile.flush();
00291                 outfile.close();
00292                 return true;
00293         }
00294 
00295         bool DeltaCompressedColumn<std::string>::load(const std::string& path_) {
00296                 std::string path(path_);
00297                 path += "/";
00298                 path += this->name_;
00299                 
00300                 std::ifstream infile (path.c_str(), std::ios_base::binary | std::ios_base::in);
00301                 boost::archive::binary_iarchive ia(infile);
00302                 
00303                 ia >> cvalues_;
00304                 
00305                 infile.close();
00306                 return true;
00307         }
00308 
00309         std::string& DeltaCompressedColumn<std::string>::operator[](const int index) {
00310                 hack_last_uncompressed = uncompress(index);
00311                 return hack_last_uncompressed;
00312         }
00313         
00314 
00315 
00316         uint DeltaCompressedColumn<std::string>::getSizeinBytes() const throw(){
00317                 uint size_in_bytes = 0;
00318                 for(uint i = 0; i < cvalues_.size(); ++i){
00319                         size_in_bytes += cvalues_[i].capacity();
00320                 }
00321                 return size_in_bytes;
00322         }
00323 
00324         //----------------------------------------------- INT, FLOAT ----------------------------------------------------------------
00325 
00326         template<class T>
00327         DeltaCompressedColumn<T>::DeltaCompressedColumn(const std::string& name, AttributeType db_type) 
00328                 : CompressedColumn<T>(name, db_type) {
00329                 size_ = 0;
00330         }
00331 
00332         template<class T>
00333         DeltaCompressedColumn<T>::~DeltaCompressedColumn() {}
00334 
00335         template<class T>
00336         bool DeltaCompressedColumn<T>::insert(const boost::any& new_value){
00337                 if (new_value.empty()) {
00338                         return false;
00339                 }
00340                 if (typeid(T) == new_value.type()) {
00341                         T value = boost::any_cast<T>(new_value);
00342 
00343                         uint* x = reinterpret_cast<uint*>(&value);
00344                         compress_insert(*x);
00345 
00346                         return true;
00347                 }
00348                 return false;
00349         }
00350 
00351         template<class T>
00352         bool DeltaCompressedColumn<T>::insert(const T& new_value) {
00353                 T value = new_value;
00354                 
00355                 uint* x = reinterpret_cast<uint*>(&value);
00356                 compress_insert(*x);
00357 
00358                 return true;
00359         }
00360 
00361         template <typename T> 
00362         template <typename InputIterator>
00363         bool DeltaCompressedColumn<T>::insert(InputIterator first, InputIterator last) {
00364                 for (; first != last; ++first) {
00365                         if (!insert(*first)) {
00366                                 return false;
00367                         }
00368                 }
00369                 return true;
00370         }
00371 
00372         template<class T>
00373         const boost::any DeltaCompressedColumn<T>::get(TID tid){
00374                 if (tid < size_) {
00375                         uint uncompressed = uncompress(tid);
00376                         T ret = *reinterpret_cast<T*>(&uncompressed);
00377 
00378                         return boost::any(ret);
00379                 } else {
00380                         std::cout << "fatal Error!!! Invalid TID!!! Attribute: " << this->name_ << " TID: " << tid << std::endl;
00381                         return boost::any();
00382                 }
00383         }
00384 
00385         template<class T>
00386         void DeltaCompressedColumn<T>::print() const throw() {
00387                 std::cout << "| " << this->name_ << " |" << std::endl;
00388                 std::cout << "________________________" << std::endl;
00389         }
00390 
00391         template<class T>
00392         size_t DeltaCompressedColumn<T>::size() const throw() {
00393                 return size_;
00394         }
00395 
00396 
00397         template<class T>
00398         const ColumnPtr DeltaCompressedColumn<T>::copy() const {
00399                 return ColumnPtr(new DeltaCompressedColumn<T>(*this));
00400         }
00401 
00402 
00403         
00404         template<class T>
00405         bool DeltaCompressedColumn<T>::update(TID tid, const boost::any& new_value) {
00406                 if (new_value.empty()) {
00407                         return false;
00408                 }
00409                 if (typeid(T) == new_value.type()) {
00410                         T value = boost::any_cast<T>(new_value);
00411 
00412                         unsigned int* x = reinterpret_cast<unsigned int*>(&value);
00413                         compress_update(tid, *x);
00414                         
00415                         return true;
00416                 } else {
00417                         std::cout << "Fatal Error!!! Typemismatch for column " << this->name_ << std::endl; 
00418                         return false;
00419                 }
00420         }
00421 
00422         template<class T>
00423         bool DeltaCompressedColumn<T>::update(PositionListPtr tids, const boost::any& new_value){
00424                 if (!tids || new_value.empty()) {
00425                         return false;
00426                 }
00427                 if(typeid(T) == new_value.type()) {
00428                         T value = boost::any_cast<T>(new_value);
00429                         uint* x = reinterpret_cast<uint*>(&value);
00430 
00431                         for(uint i = 0; i < tids->size(); i++) {
00432                                 TID tid = (*tids)[i];
00433 
00434                                 compress_update(tid, *x);
00435                         }
00436                         return true;
00437                 } else {
00438                         std::cout << "Fatal Error!!! Typemismatch for column " << this->name_ << std::endl; 
00439                 }
00440                 return false;   
00441         }
00442 
00443         template<class T>
00444         bool DeltaCompressedColumn<T>::remove(TID tid) {
00445                 compress_delete(tid);
00446                 return true;
00447         }
00448 
00449         template<class T>
00450         bool DeltaCompressedColumn<T>::remove(PositionListPtr tids){
00451                 if (!tids || tids->empty()) {
00452                         return false;
00453                 }
00454 
00455                 //std::sort(tids->begin(), tids->end());        
00456                 
00457                 for (uint i = tids->size(); i > 0; --i) {
00458                         compress_delete((*tids)[i - 1]);
00459                 }
00460                 return true;                    
00461         }
00462 
00463         template<class T>
00464         bool DeltaCompressedColumn<T>::clearContent(){
00465                 cvalues_.clear();
00466                 size_ = 0;
00467                 return true;
00468         }
00469 
00470         template<class T>
00471         bool DeltaCompressedColumn<T>::store(const std::string& path_) {
00472                 std::string path(path_);
00473                 path += "/";
00474                 path += this->name_;
00475 
00476                 std::ofstream outfile (path.c_str(),std::ios_base::binary | std::ios_base::out);
00477                 boost::archive::binary_oarchive oa(outfile);
00478 
00479                 oa << cvalues_;
00480 
00481                 outfile.flush();
00482                 outfile.close();
00483                 return true;
00484         }
00485 
00486         template<class T>
00487         bool DeltaCompressedColumn<T>::load(const std::string& path_) {
00488                 std::string path(path_);
00489                 path += "/";
00490                 path += this->name_;
00491 
00492                 std::ifstream infile (path.c_str(),std::ios_base::binary | std::ios_base::in);
00493                 boost::archive::binary_iarchive ia(infile);
00494 
00495                 ia >> cvalues_;
00496 
00497                 size_ = 0;
00498                 for (uint i = 0; i < cvalues_.size();) {
00499                         decode(i);
00500                         size_++;
00501                 }
00502 
00503                 infile.close();
00504                 return true;
00505         }
00506 
00507         template<class T>
00508         T& DeltaCompressedColumn<T>::operator[](const int index) {
00509                 uint uncompressed = uncompress(index);
00510                 hack_last_uncompressed = *reinterpret_cast<T*>(&uncompressed);
00511                 return hack_last_uncompressed;
00512         }
00513         
00514 
00515 
00516         template<class T>
00517         unsigned int DeltaCompressedColumn<T>::getSizeinBytes() const throw() {
00518                 // vector content + size_ variable
00519                 return (cvalues_.capacity() * sizeof(unsigned char)) + sizeof(uint);
00520         }
00521 
00522         template<class T>
00523         void DeltaCompressedColumn<T>::compress_insert(uint new_value) {
00524                 uint delta;
00525                 if (cvalues_.empty()) {
00526                         delta = new_value;
00527                 } else {
00528                         // determine delta
00529                         delta =  new_value ^ uncompress(size_ - 1);
00530                 }
00531                 // pass delta to char vector
00532                 while (delta > 127) {
00533                         cvalues_.push_back(128 | (delta & 127));
00534                         delta >>= 7;
00535                 }
00536                 cvalues_.push_back(255 & delta);
00537                 size_++;
00538         }
00539 
00540         template<class T>
00541         void DeltaCompressedColumn<T>::compress_update(TID tid, const uint& new_value_) {
00542                 if (tid < 0 || tid >= size_) {
00543                         throw std::out_of_range("error");
00544                 }
00545                 uint i = 0;
00546                                 
00547                 uint old_value = decode(i);
00548                 uint old_delta;
00549                 uint new_delta;
00550 
00551                 if (tid > 0) {
00552                         uint prev_value = old_value;
00553                         for (uint j = 1; j < tid; j++) {
00554                                 old_delta = decode(i);
00555                                 prev_value ^= old_delta;
00556                         }       
00557                         uint tempi = i;
00558                         old_delta = decode(tempi);
00559 
00560                         old_value = prev_value ^ old_delta;
00561                         new_delta = prev_value ^ new_value_;    
00562                         
00563                         compress_update_part(i, old_delta, new_delta);
00564                 } else {
00565                         uint tempi = 0;
00566                         compress_update_part(tempi, old_value, new_value_);
00567                 }
00568 
00569                 if (i < cvalues_.size()) {
00570                         uint tempi = i;
00571                         old_delta = decode(tempi);
00572                         new_delta = new_value_ ^ (old_value ^ old_delta);
00573                         
00574                         compress_update_part(i, old_delta, new_delta);
00575                 }
00576         }
00577 
00578         template<class T>
00579         void DeltaCompressedColumn<T>::compress_delete(TID tid) {
00580                 if (tid < 0 || tid >= size_) {
00581                         throw std::out_of_range("error");
00582                 }
00583 
00584                 if (tid == size_ - 1) {
00585                         cvalues_.pop_back();
00586                         if (cvalues_.size() > 0) {
00587                                 while (cvalues_.back() >= 128) {
00588                                         cvalues_.pop_back();
00589                                 }
00590                         }
00591                 } else {
00592                         uint i = 0;
00593                         for (uint j = 0; j < tid; j++) {
00594                                 decode(i);
00595                         }
00596                         uint tempi = i;
00597                         uint old_delta = decode(tempi);
00598                         uint old_delta2 = decode(tempi);
00599 
00600                         uint new_delta = old_delta ^ old_delta2;
00601 
00602                         uint old_length = encoded_length(old_delta);
00603                         cvalues_.erase(cvalues_.begin() + i, cvalues_.begin() + i + old_length);
00604 
00605                         if (i < cvalues_.size()) {
00606                                 compress_update_part(i, old_delta2, new_delta);
00607                         }
00608                 }
00609                 size_--;
00610         }
00611 
00612         template<class T>
00613         void DeltaCompressedColumn<T>::compress_update_part(uint& i, uint old_delta, uint new_delta) {
00614                 uint old_length = encoded_length(old_delta);
00615                 uint new_length = encoded_length(new_delta);
00616 
00617                 int delta_length = new_length - old_length;
00618                 if (delta_length != 0) {
00619                         if (delta_length < 0) {
00620                                 cvalues_.erase(cvalues_.begin() + i, cvalues_.begin() + i - delta_length);
00621                         } else {
00622                                 cvalues_.insert(cvalues_.begin() + i, delta_length, 0);
00623                         }
00624                 }
00625 
00626                 while (new_delta > 127) {
00627                         cvalues_[i++] = (128 | (new_delta & 127));
00628                         new_delta >>= 7;
00629                 }
00630                 cvalues_[i++] = (255 & new_delta);
00631         }
00632 
00633         template<class T>
00634         uint DeltaCompressedColumn<T>::uncompress(const uint index) {
00635                 if (index < 0 || index >= size_) {
00636                         throw std::out_of_range("error");
00637                 }
00638 
00639                 uint i = 0;
00640                 uint result = decode(i);
00641                 for (uint j = 1; j <= index; j++) {
00642                         uint delta = decode(i);                         
00643                         result ^= delta;
00644                 }
00645 
00646                 return result;
00647         }
00648 
00649         template<class T>
00650         uint DeltaCompressedColumn<T>::decode(uint& index) {
00651                 uint result = 0;
00652                 int shift = 0;
00653 
00654                 uchar temp;
00655                 do {
00656                         temp = cvalues_[index++];
00657                         result += (temp & 127) << shift;
00658                         shift +=7;
00659                 } while (temp > 127);
00660 
00661                 return result;
00662         }
00663 
00664         template<class T>
00665         const uint DeltaCompressedColumn<T>::encoded_length(const uint& cvalue) {
00666                 uint val = cvalue;
00667                 uint length = 0;
00668                 do {
00669                         val >>= 7;
00670                         length++;
00671                 } while (val > 0);
00672                 return length;
00673         }
00674 
00675 };
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines