小明思考

Just a software engineer
posts - 124, comments - 36, trackbacks - 0, articles - 0
  BlogJava :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理

leveldb研究4- 数据文件的格式和生成

Posted on 2012-03-12 18:21 小明 阅读(3826) 评论(1)  编辑  收藏 所属分类: 分布式计算
leveldb使用SSTable格式来保存数据。

格式为:(当前没有META BLOCK)
SSTABLE = |DATA BLOCK1|DATA BLOCK2|...|DATA BLOCK N|META BLOCK1|...|META BLOCK N|META INDEX BLOCK|DATA INDEX BLOCK|Footer|
DATA BLOCK = |KeyValues|Restart arrays|array size|Compress Type|CRC
Footer(定长) = META INDEX BLOCK offset | DATA Index Block offset| Magic Numbers

比较细节的地方是数据块的压缩,针对key使用了前缀压缩法。

下面看看具体的实现。

//builder.cc
//dbname:数据库名称
//env:OS接口
//iter:指向MemTable的一个iterator
Status BuildTable(const std::string& dbname,
                  Env
* env,
                  
const Options& options,
                  TableCache
* table_cache,
                  Iterator
* iter,
                  FileMetaData
* meta) {
  Status s;
  meta
->file_size = 0;
  iter
->SeekToFirst();

  
//生成文件名:格式 "0000x.sst"
  std::string fname = TableFileName(dbname, meta->number);
  
if (iter->Valid()) {
    WritableFile
* file;
    
//创建一个可写文件
    s = env->NewWritableFile(fname, &file);
    
if (!s.ok()) {
      
return s;
    }

    
//TableBuilder负责table生成和写入
    TableBuilder* builder = new TableBuilder(options, file);
    
//META:最小key
    meta->smallest.DecodeFrom(iter->key());
    
for (; iter->Valid(); iter->Next()) {
      Slice key 
= iter->key();
      
//META:最大key
      meta->largest.DecodeFrom(key);
      
//增加数据到builder
      builder->Add(key, iter->value());
    }

    
// Finish and check for builder errors
    if (s.ok()) {
      
//完成写入
      s = builder->Finish();
      
if (s.ok()) {
        meta
->file_size = builder->FileSize();
        assert(meta
->file_size > 0);
      }
    } 
else {
      builder
->Abandon();
    }
    delete builder;

    
// Finish and check for file errors
    if (s.ok()) {
      s 
= file->Sync();
    }
    
if (s.ok()) {
      
//sync & close,写入磁盘
      s = file->Close();
    }
    delete file;
    file 
= NULL;

    
if (s.ok()) {
      
// Verify that the table is usable
      Iterator* it = table_cache->NewIterator(ReadOptions(),
                                              meta
->number,
                                              meta
->file_size);
      s 
= it->status();
      delete it;
    }
  }

  
// Check for input iterator errors
  if (!iter->status().ok()) {
    s 
= iter->status();
  }

  
if (s.ok() && meta->file_size > 0) {
    
// Keep it
  } else {
    env
->DeleteFile(fname);
  }
  
return s;
}

}  
// namespace leveldb

我们来看看TableBuilder类,主要的细节都在这个类中实现了

TableBuilder中含有一个Rep的数据结构的指针,主要是用于保存builder的一些状态和数据。为什么不在TableBuilder头文件中直接定义这些变量?主要是不想暴露过多的细节给使用者,真是一个很好的做法。

struct TableBuilder::Rep {
  Options options;
  Options index_block_options;
  WritableFile
* file; //sstable文件指针
  uint64_t offset;
  Status status;
  BlockBuilder data_block; 
//数据块
  BlockBuilder index_block; //索引块
  std::string last_key;//上一次的key,用于比较和建立索引
  int64_t num_entries; //
  bool closed;          // 是否结束
  bool pending_index_entry; //是否要新增索引块
  BlockHandle pending_handle;  // Handle to add to index block
  std::string compressed_output;

  Rep(
const Options& opt, WritableFile* f)
      : options(opt),
        index_block_options(opt),
        file(f),
        offset(
0),
        data_block(
&options),
        index_block(
&index_block_options),
        num_entries(
0),
        closed(
false),
        pending_index_entry(
false) {
    index_block_options.block_restart_interval 
= 1;
  }
};

新加一条记录:
//增加一条数据记录
void TableBuilder::Add(const Slice& key, const Slice& value) {
  Rep
* r = rep_;
  assert(
!r->closed);
  
if (!ok()) return;
  
if (r->num_entries > 0) {
    
//检查是不是顺序添加
    assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
  }

  
if (r->pending_index_entry) { //是否生成新的index block
    
//检查当前是否是一个新的BLOCK
    assert(r->data_block.empty());
    
//根据当前的key和上一个DATA BLOCK的最后一个主键生成最短的索引
    r->options.comparator->FindShortestSeparator(&r->last_key, key);
    std::
string handle_encoding;
    r
->pending_handle.EncodeTo(&handle_encoding);
    
//增加新的INDEX BLOCK,但不立即写入
    r->index_block.Add(r->last_key, Slice(handle_encoding));
    r
->pending_index_entry = false;
  }

  r
->last_key.assign(key.data(), key.size());
  r
->num_entries++;
  r
->data_block.Add(key, value);

  
const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
  
//检查是否已经达到BLOCK SIZE,默认4K
  if (estimated_block_size >= r->options.block_size) {
    Flush();
  }
}

//写一个DATA BLOCK
void TableBuilder::Flush() {
  Rep
* r = rep_;
  assert(
!r->closed);
  
if (!ok()) return;
  
if (r->data_block.empty()) return;
  assert(
!r->pending_index_entry);
  WriteBlock(
&r->data_block, &r->pending_handle);
  
if (ok()) {
    r
->pending_index_entry = true;
    r
->status = r->file->Flush();
  }
}

//写BLOCK
void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
  
//文件格式: 数据+类型(1个字节)+ CRC(4个字节)
  assert(ok());
  Rep
* r = rep_;
//生成binary
  Slice raw = block->Finish();

//压缩数据
  Slice block_contents;
  CompressionType type 
= r->options.compression;
  
switch (type) {
    
case kNoCompression:
      block_contents 
= raw;
      
break;

    
case kSnappyCompression: {
      std::
string* compressed = &r->compressed_output;
      
if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
          compressed
->size() < raw.size() - (raw.size() / 8u)) {
        block_contents 
= *compressed;
      } 
else {
        
// Snappy not supported, or compressed less than 12.5%, so just
        
// store uncompressed form
        block_contents = raw;
        type 
= kNoCompression;
      }
      
break;
    }
  }
  handle
->set_offset(r->offset);
  handle
->set_size(block_contents.size());
  r
->status = r->file->Append(block_contents);
  
if (r->status.ok()) {
    
char trailer[kBlockTrailerSize];
    trailer[
0= type;
    uint32_t crc 
= crc32c::Value(block_contents.data(), block_contents.size());
    crc 
= crc32c::Extend(crc, trailer, 1);  // Extend crc to cover block type
    EncodeFixed32(trailer+1, crc32c::Mask(crc));
    r
->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
    
if (r->status.ok()) {
      r
->offset += block_contents.size() + kBlockTrailerSize;
    }
  }
  r
->compressed_output.clear();
  block
->Reset();
}

完成文件的写入:
Status TableBuilder::Finish() {
  Rep
* r = rep_;
  Flush();
  assert(
!r->closed);
  r
->closed = true;
  BlockHandle metaindex_block_handle;
  BlockHandle index_block_handle;
  
if (ok()) {
    
//写入META INDEX BLOCK
    BlockBuilder meta_index_block(&r->options);
    
// TODO(postrelease): Add stats and other meta blocks
    WriteBlock(&meta_index_block, &metaindex_block_handle);
  }
  
if (ok()) {
    
if (r->pending_index_entry) {
      r
->options.comparator->FindShortSuccessor(&r->last_key);
      std::
string handle_encoding;
      r
->pending_handle.EncodeTo(&handle_encoding);
      r
->index_block.Add(r->last_key, Slice(handle_encoding));
      r
->pending_index_entry = false;
    }
    
//写入索引块
    WriteBlock(&r->index_block, &index_block_handle);
  }
  
if (ok()) {
    
//写入Footer,包含META INDEX BLOCK和INDEX HANDLE的offset
    Footer footer;
    footer.set_metaindex_handle(metaindex_block_handle);
    footer.set_index_handle(index_block_handle);
    std::
string footer_encoding;
    footer.EncodeTo(
&footer_encoding);
    r
->status = r->file->Append(footer_encoding);
    
if (r->status.ok()) {
      r
->offset += footer_encoding.size();
    }
  }
  
return r->status;
}


这里面有两个类BlockBuilder和BlockHandle,BlockBuilder负责把数据按照一定格式进行序列化,而BlockHandle负责记录offset,size等,可以理解为BLOCK的文件中指针。

我们看看BlockBuilder的实现,这里leveldb实现了前缀压缩法,因为一个BLOCK的key很接近,所以前后两个key相差不会很大,所以采取了<shared_size><non_shared_size><value_size><non_shared_data><value_data>的格式,节省了空间。
其中size采用了变长格式,很有意思的格式,主要是针对小整形做的一个优化,用最多8个字节来表示4个字节的整形,每个byte的最高一个bit用来指示还有没有后续数据,如果最高位为0,则表示没有后续的bytes.这样小于7F的数据只需要一个字节来表示。
可以参考这篇文章具体看实现variant32格式。

//完成写入
Slice BlockBuilder::Finish() {
  
// 写入restart数组,每隔options_->block_restart_interval(default:16)生成一个restart offset
  for (size_t i = 0; i < restarts_.size(); i++) {
    PutFixed32(
&buffer_, restarts_[i]);
  }
  
//写入restart的大小
  PutFixed32(&buffer_, restarts_.size());
  finished_ 
= true;
  
return Slice(buffer_);
}

void BlockBuilder::Add(const Slice& key, const Slice& value) {
  Slice last_key_piece(last_key_);
  assert(
!finished_);
  assert(counter_ 
<= options_->block_restart_interval);
  assert(buffer_.empty() 
// No values yet?
         || options_->comparator->Compare(key, last_key_piece) > 0);
  size_t shared 
= 0;
  
//counter_内部计数器,用于记录当前restart后的个数
  if (counter_ < options_->block_restart_interval) {
    
//看看当前的key和上一个有多少相同的bytes
    const size_t min_length = std::min(last_key_piece.size(), key.size());
    
while ((shared < min_length) && (last_key_piece[shared] == key[shared])) {
      shared
++;
    }
  } 
else {
    
// Restart compression
    restarts_.push_back(buffer_.size());
    counter_ 
= 0;
  }
  
const size_t non_shared = key.size() - shared;

  
// 写入 "<shared><non_shared><value_size>" to 缓冲
  PutVarint32(&buffer_, shared);
  PutVarint32(
&buffer_, non_shared);
  PutVarint32(
&buffer_, value.size());

  
// 写入 non_shared data和value
  buffer_.append(key.data() + shared, non_shared);
  buffer_.append(value.data(), value.size());

  
// 设置 last_key_ 等于 当前的key
  last_key_.resize(shared);
  last_key_.append(key.data() 
+ shared, non_shared);
  assert(Slice(last_key_) 
== key);
  counter_
++;
}






评论

# re: leveldb研究4- 数据文件的格式和生成[未登录]  回复  更多评论   

2012-03-13 13:38 by tbw
研究得不出啊

只有注册用户登录后才能发表评论。


网站导航: