1.BlockIo状态迁移
RecordFile是很重要的一个类,几个重要的变量:

 final TransactionManager txnMgr;
 private final LinkedList free = new LinkedList();
 private final HashMap inUse = new HashMap();
 private final HashMap dirty = new HashMap();
 private final HashMap inTxn = new HashMap();
 private RandomAccessFile file;

free是一个LinkedList,FIFO.其他几个维护着BlockIo对象的状态,如果BlockIo对象
从free里面取出来了,那么它的状态就是inUse了,所以RecordFile类get方法最后
会将BlockIo对象放到inUse的Map中:

 BlockIo node;
 ......
 inUse.put(key, node);
 node.setClean(); //注意此时node被设置为dirty=false,也就是说BlockIo也有dirty这个指标
 return node;

inUse的BlockIo对象如果被修改了,那么它的状态就变成dirty了。由于从inUse中取出的对象是否发生
了改变RecordFile对象不知道,需要调用者调用一个方法release:

 /**
 *  Releases a block.
 *
 *  @param blockid The record number to release.
 *  @param isDirty If true, the block was modified since the get().
 */
 void release(long blockid, boolean isDirty)
 throws IOException {
  BlockIo node = (BlockIo) inUse.get(new Long(blockid));
  if (node == null)
      throw new IOException("bad blockid " + blockid + " on release");
  if (!node.isDirty() && isDirty)
      node.setDirty();
  release(node);
 }
 
 /**
 *  Releases a block.
 *
 *  @param block The block to release.
 */
 void release(BlockIo block) {
  Long key = new Long(block.getBlockId());
  inUse.remove(key);
  if (block.isDirty()) {
      // System.out.println( "Dirty: " + key + block );
      dirty.put(key, block);
  } else {
      if (!transactionsDisabled && block.isInTransaction()) {
          inTxn.put(key, block);
      } else {
          free.add(block);
      }
  }
 }

release方法输入两个参数,其中一个是isDirty,如果取出来的BlockIo对象修改了
就应该将isDirty置为true,否则修改不会被保存。也就是说修改后,BlockIo对象
被放入map dirty中。
release以后,BlockIo对象可能有三个状态,首先它会从inUse map里面删除。如果
BlockIo对象被修改,则被放入dirty map中。如果没有修改,就有可能放入free map中。

做完修改后,可以关闭这个RecordFile对象。

 RecordFile file = new RecordFile( testFileName );
        byte[] data = file.get( 0 ).getData();
        data[ 14 ] = (byte) 'b';
        file.release( 0, true );
        file.close();

close方法的工作如下:

 void close() throws IOException {
         if (!dirty.isEmpty()) {
             commit();
         }
         txnMgr.shutdown();
         ......
         file.close();  //此时RandomAccessFile对象close
         file = null;
     }

可以看到close方法里面主要就是commit这个事务:

 for (Iterator i = dirty.values().iterator(); i.hasNext(); ) {
            BlockIo node = (BlockIo) i.next();
            i.remove();
            // System.out.println("node " + node + " map size now " + dirty.size());
            if (transactionsDisabled) {
                long offset = node.getBlockId() * BLOCK_SIZE;
                file.seek(offset);
                file.write(node.getData());
                node.setClean();
                free.add(node);
            }
            else {
                txnMgr.add(node);
                inTxn.put(new Long(node.getBlockId()), node);
            }
        }

如果transaction被disable,那么每一个节点进行更新,如下:
 long offset = node.getBlockId() * BLOCK_SIZE;
        file.seek(offset);
        file.write(node.getData());
        node.setClean();
        free.add(node);
从最后一行,BlockIo对象node的状态重新变为free。以上状态的变化是如下一个循环:
free -> inUse -> dirty -> inTxn -> free

2.BlockIo的获取
RecordFile对应两个文件,一个是.db文件,另一个是.lg文件。缓存对象即BlockIo是操作
的最小单元,在get方法中,如果指定的blockid不在inTxn,dirty和free中,那么通过
getNewNode(blockid)得到一个新的BlockIo对象,如果blockid在有效范围内(这个判断是通过
计算offset得到的,offset=blockid*BLOCK_SIZE,offset小于.db文件的大小,那说明blockid对应
的数据块在文件内),那么就从.db文件中读取去blockid对应的数据库。如果不在有效范围内,那么
数据块就是cleanData的copy,即A block of clean data。

 // get a new node and read it from the file
         node = getNewNode(blockid);
         long offset = blockid * BLOCK_SIZE;
         if (file.length() > 0 && offset <= file.length()) {
             read(file, offset, node.getData(), BLOCK_SIZE);
         } else {
             System.arraycopy(cleanData, 0, node.getData(), 0, BLOCK_SIZE);
         }
         inUse.put(key, node);
         node.setClean();
         return node;

这里getNewNode()方法如下:
    private BlockIo getNewNode(long blockid)
    throws IOException
    {

        BlockIo retval = null;
        if (!free.isEmpty()) {
            retval = (BlockIo) free.removeFirst();
        }
        if (retval == null)
            retval = new BlockIo(0, new byte[BLOCK_SIZE]);

        retval.setBlockId(blockid);
        retval.setView(null);
        return retval;
    }
   
getNewNode()不是直接new BlockIo(),而是从free中取,free中的
BlockIo对象没有被使用,则直接利用,采取的方式是Least Recently Used策略。BlockIo实现了自定义的Externalizable
序列化:
    // implement externalizable interface
    public void readExternal(ObjectInput in)
    throws IOException, ClassNotFoundException {
        blockId = in.readLong();
        int length = in.readInt();
        data = new byte[length];
        in.readFully(data);
    }

    // implement externalizable interface
    public void writeExternal(ObjectOutput out) throws IOException {
        out.writeLong(blockId);
        out.writeInt(data.length);
        out.write(data);
    }


3.TransactionManager的事务处理

序列化用在事务这块,如果没有启动事务,RecordFile直接写到.db文件中,不会进行序列化操作。

RecordFile演示了事务的操作:
 final TransactionManager txnMgr;
 ......
 txnMgr = new TransactionManager(this);
 ......
 if (!transactionsDisabled) {
            txnMgr.start();
        }
        ......
        if (!transactionsDisabled)
     txnMgr.add(node);

 ......
 if (!transactionsDisabled) {
            txnMgr.commit();
        }
        ......
        txnMgr.shutdown();

以上是一个完整的事务过程,下面对以上过程发生的操作深入阐述:

TransactionManager(this)以RecordFile作为参数进行构造:
 TransactionManager(RecordFile owner) throws IOException {
     this.owner = owner;
     recover();
     open();
 }

TransactionManager持有RecordFile的引用,然后进行recover和open操作。recover主要是
对log file进行操作,如果有事务没有执行,那么执行事务将log file中的数据写到.db文件中
并且对RecordFile进行sync()操作,最后把log file删除。

    private void recover() throws IOException {
        String logName = makeLogName();
        File logFile = new File(logName);
        if (!logFile.exists())
            return;
        if (logFile.length() == 0) {
            logFile.delete();
            return;
        }

        FileInputStream fis = new FileInputStream(logFile);
        ObjectInputStream ois = new ObjectInputStream(fis);

        try {
            if (ois.readShort() != Magic.LOGFILE_HEADER)
                throw new Error("Bad magic on log file");
        } catch (IOException e) {
            // corrupted/empty logfile
            logFile.delete();
            return;
        }

        while (true) {
            ArrayList blocks = null;
            try {
                blocks = (ArrayList) ois.readObject();
            } catch (ClassNotFoundException e) {
                throw new Error("Unexcepted exception: " + e);
            } catch (IOException e) {
                // corrupted logfile, ignore rest of transactions
                break;
            }
            synchronizeBlocks(blocks.iterator(), false);

            // ObjectInputStream must match exactly each
            // ObjectOutputStream created during writes
            try {
                ois = new ObjectInputStream(fis);
            } catch (IOException e) {
                // corrupted logfile, ignore rest of transactions
                break;
            }
        }
        owner.sync();
        logFile.delete();
    }

open的操作相对简单很多,只是进行一些初始化赋值工作:
   
    /** Opens the log file */
    private void open() throws IOException {
        fos = new FileOutputStream(makeLogName());
        oos = new ObjectOutputStream(fos);
        oos.writeShort(Magic.LOGFILE_HEADER);
        oos.flush();
        curTxn = -1;
    }

下一步就是start这个txnMgr了:
    void start() throws IOException {
        curTxn++;
        if (curTxn == _maxTxns) {
            synchronizeLogFromMemory();
            curTxn = 0;
        }
        txns[curTxn] = new ArrayList();
    }
start的时候就将当前事务数增加1,如果当前事务数等于设置的最大事务数,就进行sync处理。
sync处理的代码如下:

 /** Synchs in-core transactions to data file and opens a fresh log */
    private void synchronizeLogFromMemory() throws IOException {
        close();

        TreeSet blockList = new TreeSet( new BlockIoComparator() );

        int numBlocks = 0;
        int writtenBlocks = 0;
        for (int i = 0; i < _maxTxns; i++) {
            if (txns[i] == null)
                continue;
            // Add each block to the blockList, replacing the old copy of this
            // block if necessary, thus avoiding writing the same block twice
            for (Iterator k = txns[i].iterator(); k.hasNext(); ) {
                BlockIo block = (BlockIo)k.next();
                if ( blockList.contains( block ) ) {
                    block.decrementTransactionCount();
                }
                else {
                    writtenBlocks++;
                    boolean result = blockList.add( block );
                }
                numBlocks++;
            }

            txns[i] = null;
        }
        // Write the blocks from the blockList to disk
        synchronizeBlocks(blockList.iterator(), true);

        owner.sync();
        open();
    }

sync的主要操作就是synchronizeBlocks操作:
    private void synchronizeBlocks(Iterator blockIterator, boolean fromCore)
    throws IOException
    {
        // write block vector elements to the data file.
        while ( blockIterator.hasNext() ) {
            BlockIo cur = (BlockIo)blockIterator.next();
            owner.synch(cur);
            if (fromCore) {
                cur.decrementTransactionCount();
                if (!cur.isInTransaction()) {
                    owner.releaseFromTransaction(cur, true);
                }
            }
        }
    }


接下来的操作txnMgr.add(node):
    /**
     *  Indicates the block is part of the transaction.
     */
    void add(BlockIo block) throws IOException {
        block.incrementTransactionCount();
        txns[curTxn].add(block);
    }
这个操作很简单,就是将BlockIo对象放入到当前事务的ArrayList当中。之后commit操作:

    /**
     *  Commits the transaction to the log file.
     */
    void commit() throws IOException {
        oos.writeObject(txns[curTxn]);
        sync();

        // set clean flag to indicate blocks have been written to log
        setClean(txns[curTxn]);

        // open a new ObjectOutputStream in order to store
        // newer states of BlockIo
        oos = new ObjectOutputStream(fos);
    } 
   
最后的操作就是txnMgr.shutdown():

    /**
     *  Shutdowns the transaction manager. Resynchronizes outstanding
     *  logs.
     */
    void shutdown() throws IOException {
        synchronizeLogFromMemory();
        close();
    }