小明思考

Just a software engineer
posts - 124, comments - 36, trackbacks - 0, articles - 0
  BlogJava :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理

leveldb研究3-数据库日志文件格式

Posted on 2012-03-09 16:00 小明 阅读(3605) 评论(1)  编辑  收藏 所属分类: 分布式计算
leveldb在每次数据库操作之前都会把操作记录下来。
主要实现在db\log_format.h,db\log_reader.h,db\log_reader.cc,db\log_write.h,db\log_write.cc中。我们来具体看看实现。

日志格式
db\log_format.h
log是分块的,每块为32K,每条记录的记录头为7个字节,前四个为CRC,然后是长度(2个字节),最后是记录类型(1个字节)
---------------------------------------
BLOCK1|BLOCK2|BLOCK3|...|BLOCKN
---------------------------------------

enum RecordType {
 
// Zero is reserved for preallocated files
  kZeroType = 0,

  kFullType 
= 1,

  
// For fragments
  kFirstType = 2,
  kMiddleType 
= 3,
  kLastType 
= 4
};
static const int kMaxRecordType = kLastType;

static const int kBlockSize = 32768;

// Header is checksum (4 bytes), type (1 byte), length (2 bytes).
static const int kHeaderSize = 4 + 1 + 2;

}  
// namespace log
}  // namespace leveldb

写日志操作
db\log_writer.cc
请注意这里的处理,由于1条记录可能超过一个BLOCK的大小,所以需要分成多个片段写入。
//增加一条记录
Status Writer::AddRecord(const Slice& slice) {
  
const char* ptr = slice.data();
  size_t left 
= slice.size();

  
// Fragment the record if necessary and emit it.  Note that if slice
  
// is empty, we still want to iterate once to emit a single
  
// zero-length record
  Status s;
  
bool begin = true;
  
do {
    
const int leftover = kBlockSize - block_offset_; //当前剩余多少字节
    assert(leftover >= 0);
    
if (leftover < kHeaderSize) { //不够文件头大小7bytes
      
// 转入新的block
      if (leftover > 0) {
        
//用0来填充空白
        assert(kHeaderSize == 7);
        dest_
->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
      }
      block_offset_ 
= 0;
    }

    
// Invariant: we never leave < kHeaderSize bytes in a block.
    assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

    
//avail:除掉头还算多少字节
    const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
    
//实际写入大小
    const size_t fragment_length = (left < avail) ? left : avail;

    RecordType type;
    
const bool end = (left == fragment_length); //记录是否结束
    if (begin && end) {
      type 
= kFullType; //完整记录
    } else if (begin) {
      type 
= kFirstType; //开头
    } else if (end) {
      type 
= kLastType; //结尾
    } else {
      type 
= kMiddleType; //中间
    }
    
//写入
    s = EmitPhysicalRecord(type, ptr, fragment_length);
    ptr 
+= fragment_length;
    left 
-= fragment_length;
    begin 
= false;
  } 
while (s.ok() && left > 0);
  
return s;
}

//实际写入日志文件
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
  assert(n 
<= 0xffff);  // Must fit in two bytes
  assert(block_offset_ + kHeaderSize + n <= kBlockSize);

  
// 记录头
  char buf[kHeaderSize];
  buf[
4= static_cast<char>(n & 0xff);
  buf[
5= static_cast<char>(n >> 8);
  buf[
6= static_cast<char>(t);

  
// 计算CRC
  uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);
  crc 
= crc32c::Mask(crc);                 // Adjust for storage
  EncodeFixed32(buf, crc);

  
// 写入头部
  Status s = dest_->Append(Slice(buf, kHeaderSize));
  
if (s.ok()) {
    
//写入记录片段
    s = dest_->Append(Slice(ptr, n));
    
if (s.ok()) {
      s 
= dest_->Flush();
    }
  }
  block_offset_ 
+= kHeaderSize + n;
  
return s;
}

读日志操作
这里可以看出使用BLOCK的好处,能够减少文件IO次数,读日志基本上就是写日志反向过程。

//读取记录,scratch为缓冲,record是结果
bool Reader::ReadRecord(Slice* record, std::string* scratch) {
  
if (last_record_offset_ < initial_offset_) { //需要跳过文件头部信息,目前未实现
    if (!SkipToInitialBlock()) {
      
return false;
    }
  }

  scratch
->clear();
  record
->clear();
  
bool in_fragmented_record = false//是否是碎片记录
// Record offset of the logical record that we're reading
  
// 0 is a dummy value to make compilers happy
  uint64_t prospective_record_offset = 0;
  Slice fragment;
  
while (true) {
    uint64_t physical_record_offset 
= end_of_buffer_offset_ - buffer_.size();
    
//从文件中读取一个BLOCK
    const unsigned int record_type = ReadPhysicalRecord(&fragment);
    
switch (record_type) {
      
case kFullType: //完整Record
        if (in_fragmented_record) {
          
// Handle bug in earlier versions of log::Writer where
          
// it could emit an empty kFirstType record at the tail end
          
// of a block followed by a kFullType or kFirstType record
          
// at the beginning of the next block.
          if (scratch->empty()) {
            in_fragmented_record 
= false;
          } 
else {
            ReportCorruption(scratch
->size(), "partial record without end(1)");
          }
        }
        prospective_record_offset 
= physical_record_offset;
        scratch
->clear();
        
*record = fragment;
        last_record_offset_ 
= prospective_record_offset;
        
return true;

      
case kFirstType: //Record开始
        if (in_fragmented_record) {
          
// Handle bug in earlier versions of log::Writer where
          
// it could emit an empty kFirstType record at the tail end
          
// of a block followed by a kFullType or kFirstType record
          
// at the beginning of the next block.
          if (scratch->empty()) {
            in_fragmented_record 
= false;
          } 
else {
            ReportCorruption(scratch
->size(), "partial record without end(2)");
          }
        }
        prospective_record_offset 
= physical_record_offset;
        scratch
->assign(fragment.data(), fragment.size());
        in_fragmented_record 
= true;
        
break;

      
case kMiddleType://Record中间
        if (!in_fragmented_record) {
          ReportCorruption(fragment.size(),
                           
"missing start of fragmented record(1)");
        } 
else {
          scratch
->append(fragment.data(), fragment.size());
        }
        
break;

      
case kLastType://Record结尾
        if (!in_fragmented_record) {
          ReportCorruption(fragment.size(),
                           
"missing start of fragmented record(2)");
        } 
else {
          scratch
->append(fragment.data(), fragment.size());
          
*record = Slice(*scratch);
          last_record_offset_ 
= prospective_record_offset;
          
return true;
        }
        
break;

      
case kEof://文件结束
        if (in_fragmented_record) {
          ReportCorruption(scratch
->size(), "partial record without end(3)");
          scratch
->clear();
        }
        
return false;

      
case kBadRecord://坏记录
        if (in_fragmented_record) {
          ReportCorruption(scratch
->size(), "error in middle of record");
          in_fragmented_record 
= false;
          scratch
->clear();
        }
        
break;

      
default: {//无法识别
        char buf[40];
        snprintf(buf, 
sizeof(buf), "unknown record type %u", record_type);
        ReportCorruption(
            (fragment.size() 
+ (in_fragmented_record ? scratch->size() : 0)),
            buf);
        in_fragmented_record 
= false;
        scratch
->clear();
        
break;
      }
    }
  }
  
return false;
}

//从文件中读取
unsigned int Reader::ReadPhysicalRecord(Slice* result) {
  
while (true) {
    
if (buffer_.size() < kHeaderSize) {
      
if (!eof_) {
        
// Last read was a full read, so this is a trailer to skip
        buffer_.clear();
        
//读入一个BLOCK
        Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
        end_of_buffer_offset_ 
+= buffer_.size();
        
if (!status.ok()) {
          buffer_.clear();
          ReportDrop(kBlockSize, status);
          eof_ 
= true;
          
return kEof;
        } 
else if (buffer_.size() < kBlockSize) {
          eof_ 
= true;
        }
        
continue;
      } 
else if (buffer_.size() == 0) {
        
// End of file
        return kEof;
      } 
else {
        size_t drop_size 
= buffer_.size();
        buffer_.clear();
        ReportCorruption(drop_size, 
"truncated record at end of file");
        
return kEof;
      }
    }

    
// 解析record头
    const char* header = buffer_.data();
    
const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
    
const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
    
const unsigned int type = header[6];
    
const uint32_t length = a | (b << 8);
    
if (kHeaderSize + length > buffer_.size()) {
      size_t drop_size 
= buffer_.size();
      buffer_.clear();
      ReportCorruption(drop_size, 
"bad record length");
      
return kBadRecord;
    }

    
if (type == kZeroType && length == 0) {
      
// Skip zero length record without reporting any drops since
      
// such records are produced by the mmap based writing code in
      
// env_posix.cc that preallocates file regions.
      buffer_.clear();
      
return kBadRecord;
    }

    
// 检查CRC
    if (checksum_) {
      uint32_t expected_crc 
= crc32c::Unmask(DecodeFixed32(header));
      uint32_t actual_crc 
= crc32c::Value(header + 61 + length);
      
if (actual_crc != expected_crc) {
        
// Drop the rest of the buffer since "length" itself may have
        
// been corrupted and if we trust it, we could find some
        
// fragment of a real log record that just happens to look
        
// like a valid log record.
        size_t drop_size = buffer_.size();
        buffer_.clear();
        ReportCorruption(drop_size, 
"checksum mismatch");
        
return kBadRecord;
      }
    }

    buffer_.remove_prefix(kHeaderSize 
+ length);

    
// Skip physical record that started before initial_offset_
    if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
        initial_offset_) {
      result
->clear();
      
return kBadRecord;
    }

    
*result = Slice(header + kHeaderSize, length);
    
return type;
  }
}



评论

# re: leveldb研究3-数据库日志文件格式  回复  更多评论   

2012-03-12 09:23 by tb
呵呵 不错啊

只有注册用户登录后才能发表评论。


网站导航: