随笔 - 16  文章 - 1  trackbacks - 0
<2009年12月>
293012345
6789101112
13141516171819
20212223242526
272829303112
3456789

常用链接

留言簿

随笔档案

搜索

  •  

最新评论

阅读排行榜

评论排行榜

Two-phase commit(http://en.wikipedia.org/wiki/Two-phase_commit_protocol)是分布式事务最基础的协议,Three-phase commit(http://en.wikipedia.org/wiki/Three-phase_commit_protocol)主要解决Two-phase commit中协调者宕机问题。

Two-phase commit的算法实现 (from <<Distributed System: Principles and Paradigms>>):

协调者(Coordinator):

    write START_2PC to local log;

    multicast VOTE_REQUEST to all participants;

    while not all votes have been collected {

        wait for any incoming vote;

        if timeout {

            write GLOBAL_ABORT to local log;

            multicast GLOBAL_ABORT to all participants;

            exit;

        }

        record vote;

    }

    if all participants sent VOTE_COMMIT and coordinator votes COMMIT {

        write GLOBAL_COMMIT to local log;

        multicast GLOBAL_COMMIT to all participants;

    } else {

        write GLOBAL_ABORT to local log;

        multicast GLOBAL_ABORT to all participants;

    }

参与者(Participants)

    write INIT to local log;

    wait for VOTE_REQUEST from coordinator;

    if timeout {

        write VOTE_ABORT to local log;

        exit;

    }

    if participant votes COMMIT {

        write VOTE_COMMIT to local log;

        send VOTE_COMMIT to coordinator;

        wait for DECISION from coordinator;

        if timeout {

            multicast DECISION_REQUEST to other participants;

            wait until DECISION is received;  /* remain blocked*/

            write DECISION to local log;

        }

        if DECISION == GLOBAL_COMMIT

            write GLOBAL_COMMIT to local log;

        else if DECISION == GLOBAL_ABORT

            write GLOBAL_ABORT to local log;

    } else {

        write VOTE_ABORT to local log;

        send VOTE_ABORT to coordinator;

    }

另外,每个参与者维护一个线程专门处理其它参与者的DECISION_REQUEST请求,处理线程流程如下:

    while true {

        wait until any incoming DECISION_REQUEST is received;

        read most recently recorded STATE from the local log;

        if STATE == GLOBAL_COMMIT

            send GLOBAL_COMMIT to requesting participant;

        else if STATE == INIT or STATE == GLOBAL_ABORT;

            send GLOBAL_ABORT to requesting participant;

        else

            skip;  /* participant remains blocked */

    }

从上述的协调者与参与者的流程可以看出,如果所有参与者VOTE_COMMIT后协调者宕机,这个时候每个参与者都无法单独决定全局事务的最终结果(GLOBAL_COMMIT还是GLOBAL_ABORT),也无法从其它参与者获取,整个事务一直阻塞到协调者恢复;如果协调者出现类似磁盘坏这种永久性错误,该事务将成为被永久遗弃的孤儿。问题的解决有如下思路:

1. 协调者持久化数据定期备份。为了防止协调者出现永久性错误,这是一种代价最小的解决方法,不容易引入bug,但是事务被阻塞的时间可能特别长,比较适合银行这种正确性高于一切的系统。

2. Three-phase Commit。这是理论上的一种方法,实现起来复杂且效率低。思路如下:假设参与者机器不可能出现超过一半同时宕机的情况,如果协调者宕机,我们需要从活着的超过一半的参与者中得出事务的全局结果。由于不可能知道已经宕机的参与者的状态,所以引入一个新的参与者状态PRECOMMIT,参与者成功执行一个事务需要经过INIT, READY, PRECOMMIT,最后到COMMIT状态;如果至少有一个参与者处于PRECOMMIT或者COMMIT,事务成功;如果至少一个参与者处于INIT或者ABORT,事务失败;如果所有的参与者都处于READY(至少一半参与者活着),事务失败,即使原先宕机的参与者恢复后处于PRECOMMIT状态,也会因为有其它参与者处于ABORT状态而回滚。PRECOMMIT状态的引入给了宕机的参与者回滚机会,所以Three-phase commit在超过一半的参与者活着的时候是不阻塞的。不过,Three-phase Commit只能算是是理论上的探索,效率低并且没有解决网络分区问题。

3. Paxos解决协调者单点问题。Jim Gray和Lamport合作了一篇论文讲这个方法,很适合互联网公司的超大规模集群,Google的Megastore事务就是这样实现的,不过问题在于Paxos和Two-phase Commit都不简单,需要有比较靠谱(代码质量高)的小团队设计和编码才行。后续的blog将详细阐述该方法。

总之,分布式事务只能是系统开发者的乌托邦式理想,Two-phase commit的介入将导致涉及多台机器的事务之间完全串行,没有代价的分布式事务是不存在的。

 

posted @ 2009-12-22 23:01 Programmers 阅读(838) | 评论 (0)编辑 收藏

前面我的一篇文章http://hi.baidu.com/knuthocean/blog/item/12bb9f3dea0e400abba1673c.html引用了对Google App Engine工程师关于Bigtable/Megastore replication的文章。当时留下了很多疑问,比如:为什么Google Bigtable 是按照column family级别而不是按行执行replication的?今天重新思考了Bigtable replication问题,有如下体会:

1. Bigtable/GFS的设计属于分层设计,和文件系统/数据库分层设计原理一致,通过系统隔离解决工程上的问题。这种分层设计带来了两个问题,一个是性能问题,另外一个就是Replication问题。由于存储节点和服务节点可能不在一台机器,理论上总是存在性能问题,这就要求我们在加载/迁移Bigtable子表(Bigtable tablet)的时候考虑本地化因素;另外,GFS有自己的replication机制保证存储的可靠性,Bigtable通过分离服务节点和存储节点获得了很大的灵活性,且Bigtable的宕机恢复时间可以做到很短。对于很多对实时性要求不是特别高的应用Bigtable由于服务节点同时只有一个,既节约资源又避免了单点问题。然后,Bigtable tablet服务过于灵活导致replication做起来极其困难。比如,tablet的分裂和合并机制导致多个tablet(一个只写,其它只读)服务同一段范围的数据变得几乎不可能。

2. Google replication分为两种机制,基于客户端和基于Tablet Server。分述如下:

2-1). 基于客户端的replication。这种机制比较简单,实现如下:客户端读/写操作均为异步操作,每个写操作都尝试写两个Bigtable集群,任何一个写成功就返回用户,客户端维护一个retry list,不断重试失败的写操作。读操作发到两个集群,任何一个集群读取成功均可。然后,这样做有两个问题:

    a. 客户端不可靠,可能因为各种问题,包括程序问题退出,retry list丢失导致两个集群的数据不一致;

    b. 多个客户端并发操作时无法保证顺序性。集群A收到的写操作可能是"DEL item; PUT item";集群B的可能是"PUT item; DEL item"。

2-2). 基于Tablet Server的replication。这种机制实现较为复杂,目的是为了保证读服务,写操作的延时仍然可能比较长。两个集群,一个为主集群,提供读/写服务;一个为slave集群,提供只读服务,两个集群维持最终一致性。对于一般的读操作,尽量读取主集群,如果主集群不可以访问则读取slave集群;对于写操作,首先将写操作提交到主集群的Tablet Server,主集群的Tablet Server维护slave集群的元数据信息,并维护一个后台线程不断地将积攒的用户表格写操作提交到slave集群进行日志回放(group commit)。对于一般的tablet迁移,操作逻辑和Bigtable论文中的完全一致;主集群如果发生了机器宕机,则除了回放commit log外,还需要完成宕机的Tablet Server遗留的后台备份任务。之所以要按照column family级别而不是按行复制,是为了提高压缩率从而提高备份效率。如果主集群写操作日志的压缩率大于备份数据的压缩率,则可能出现备份不及时,待备份数据越来越多的问题。

假设集群A为主集群,集群B是集群A的备份,集群切换时先停止集群A的写服务,将集群A余下的备份任务备份到集群B后切换到集群B;如果集群A不可访问的时间不可预知,可以选择直接切换到集群B,这样会带来一致性问题。且由于Bigtable是按列复制的,最后写入的一些行的事务性无法保证。不过由于写操作数据还是保存在集群A的,所以用户可以知道丢了哪些数据,很多应用可以通过重新执行A集群遗留的写操作进行灾难恢复。Google的App Engine也提供了这种查询及重做丢失的写操作的工具。

想法不成熟,有问题联系:knuthocean@163.com

posted @ 2009-12-18 22:05 Programmers 阅读(368) | 评论 (0)编辑 收藏

负载平衡策略

Dynamo的负载平衡取决于如何给每台机器分配虚拟节点号。由于集群环境的异构性,每台物理机器包含多个虚拟节点。一般有如下两种分配节点号的方法:

1. 随机分配。每台物理节点加入时根据其配置情况随机分配S个Token(节点号)。这种方法的负载平衡效果还是不错的,因为自然界的数据大致是比较随机的,虽然可能出现某段范围的数据特别多的情况(如baidu, sina等域名下的网页特别多),但是只要切分足够细,即S足够大,负载还是比较均衡的。这个方法的问题是可控性较差,新节点加入/离开系统时,集群中的原有节点都需要扫描所有的数据从而找出属于新节点的数据,Merkle Tree也需要全部更新;另外,增量归档/备份变得几乎不可能。

2. 数据范围等分+随机分配。为了解决方法1的问题,首先将数据的Hash空间等分为Q = N * S份 (N=机器个数,S=每台机器的虚拟节点数),然后每台机器随机选择S个分割点作为Token。和方法1一样,这种方法的负载也比较均衡,且每台机器都可以对属于每个范围的数据维护一个逻辑上的Merkle Tree,新节点加入/离开时只需扫描部分数据进行同步,并更新这部分数据对应的逻辑Merkle Tree,增量归档也变得简单。该方法的一个问题是对机器规模需要做出比较合适的预估,随着业务量的增长,可能需要重新对数据进行划分。

不管采用哪种方法,Dynamo的负载平衡效果还是值得担心的。

客户端缓存及前后台任务资源分配

客户端缓存机器信息可以减少一次在DHT中定位目标机器的网络交互。由于客户端数量不可控,这里缓存采用客户端pull的方式更新,Dynamo中每隔10s或者读/写操作发现缓存信息不一致时客户端更新一次缓存信息。

Dynamo中同步操作、写操作重试等后台任务较多,为了不影响正常的读写服务,需要对后台任务能够使用的资源做出限制。Dynamo中维护一个资源授权系统。该系统将整个机器的资源切分成多个片,监控60s内的磁盘读写响应时间,事务超时时间及锁冲突情况,根据监控信息算出机器负载从而动态调整分配给后台任务的资源片个数。

Dynamo的优点

1. 设计简单,组合利用P2P的各种成熟技术,模块划分好,代码复用程度高。

2. 分布式逻辑与单机存储引擎逻辑基本隔离。很多公司有自己的单机存储引擎,可以借鉴Dynamo的思想加入分布式功能。

3. NWR策略可以根据应用自由调整,这个思想已经被Google借鉴到其下一代存储基础设施中。

4. 设计上天然没有单点,且基本没有对系统时钟一致性的依赖。而在Google的单Master设计中,Master是单点,需要引入复杂的分布式锁机制来解决,且Lease机制需要对机器间时钟同步做出假设。

Dynamo的缺陷

1. 负载平衡相比单Master设计较不可控;负载平衡策略一般需要预估机器规模,不能无缝地适应业务动态增长。

2. 系统的扩展性较差。由于增加机器需要给机器分配DHT算法所需的编号,操作复杂度较高,且每台机器存储了整个集群的机器信息及数据文件的Merkle Tree信息,机器最大规模只能到几千台。

3. 数据一致性问题。多个客户端的写操作有顺序问题,而在GFS中可以通过只允许Append操作得到一个比较好的一致性模型。

4. 数据存储不是有序,无法执行Mapreduce;Mapreduce是目前允许机器故障,具有强扩展性的最好的并行计算模型,且有开源的Hadoop可以直接使用,Dynamo由于数据存储依赖Hash无法直接执行Mapreduce任务。

 

posted @ 2009-12-05 15:19 Programmers 阅读(1698) | 评论 (0)编辑 收藏

异常处理

Dynamo中把异常分为两种类型,临时性的异常和永久性异常。服务器程序运行时一般通过类似supervise的监控daemon启动,出现core dump等异常情况时自动重启。这种异常是临时性的,其它异常如硬盘报修或机器报废等由于其持续时间太长,称之为永久性的。回顾Dynamo的设计,一份数据被写到N, N+1, ... N+K-1这K台机器上,如果机器N+i (0 <= i <= K-1)宕机,原本写入该机器的数据转移到机器N+K,机器N+K定时ping机器N+i,如果在指定的时间T内N+i重新提供服务,机器N+K将启动传输任务将暂存的数据发送给机器N+i;如果超过了时间T机器N+i还是处于宕机状态,这种异常被认为是永久性的,这时需要借助Merkle Tree机制进行数据同步。这里的问题在于时间T的选择,所以Dynamo的开发人员后来干脆把所有程序检测出来的异常认为是临时性的,并提供给管理员一个utility工具,用来显示指定一台机器永久性下线。由于数据被存储了K份,一台机器下线将导致后续的K台机器出现数据不一致的情况。这是因为原本属于机器N的数据由于机器下线可能被临时写入机器N+1, ... N+K。如果机器N出现永久性异常,后续的K台机器都需要服务它的部分数据,这时它们都需要选择冗余机器中较为空闲的一台进行同步。Merkle Tree同步的原理很简单,每个非叶子节点对应多个文件,为其所有子节点值组合以后的Hash值,叶子节点对应单个数据文件,为文件内容的Hash值。这样,任何一个数据文件不匹配都将导致从该文件对应的叶子节点到根节点的所有节点值不同。每台机器维护K棵Merkle Tree,机器同步时首先传输Merkle Tree信息,并且只需要同步从根到叶子的所有节点值均不相同的文件。

读/写流程

客户端的读/写请求首先传输到缓存的一台机器,根据预先配置的K、W和R值,对于写请求,根据DHT算法计算出数据所属的节点后直接写入后续的K个节点,等到W个节点返回成功时返回客户端,如果写请求失败将加入retry_list不断重试。如果某台机器发生了临时性异常,将数据写入后续的备用机器并在备用机器中记录临时异常的机器信息。对于读请求,根据DHT算法计算出数据所属节点后根据负载策略选择R个节点,从中读取R份数据,如果数据一致,直接返回客户端;如果数据不一致,采用vector clock的方法解决冲突。Dynamo系统默认的策略是选择最新的数据,当然用户也可以自定义冲突处理方法。每个写入系统的<key, value>对都记录一个vector lock信息,vector lock就是一系列<机器节点号, 版本号/时间戳>对,记录每台机器对该数据的最新更新版本信息。如下图:


读取时进行冲突解决,如果一台机器读到的数据的vector lock记录的所有版本信息都小于另一台机器,直接返回vector lock较大的数据;如果二者是平行版本,根据时间戳选择最新的数据或者通过用户自定义策略解决冲突。读请求除了返回数据<key, value>值以外还返回vector lock信息,后续的写操作需要带上该信息。

问题1:垃圾数据如何回收?

Dynamo的垃圾回收机制主要依赖每个节点上的存储引擎,如Berkely db存储引擎,merge-dump存储引擎等。其它操作,如Merkle Tree同步产生的垃圾文件回收可以和底层存储引擎配合完成。

问题2:Dynamo有没有可能丢数据?

关键在于K, W, R的设置。假设一个读敏感应用设置K=3, W=3, R=1,待处理的数据原本属于节点A, B, C,节点B出现临时性故障的过程中由节点D代替。在节点B出现故障到节点B同步完成节点D暂存的修改这段时间内,如果读请求落入节点B或者D都将出现丢数据的问题。这里需要适当处理下,对于B节点下线的情况,由于其它机器要么缓存了B节点已下线信息,要么读取时将发现B节点处于下线状态,这是只需要将请求转发其它节点即可;对于B节点上线情况,可以等到B节点完全同步以后才开始提供读服务。对于设置W<K的应用,Dynamo读取时需要解决冲突,可能丢数据。总之,Dynamo中可以保证读取的机器都是有效的(处于正常服务状态),但W != K时不保证所有的有效机器均同步了所有更新操作。

问题3:Dynamo的写入数据有没有顺序问题?

假设要写入两条数据"add item"和"delete item",如果写入的顺序不同,将导致完全不同的结果。如果设置W=K,对于同一个客户端,由于写入所有的机器以后才返回,可以保证顺序;而多个客户端的写操作可能被不同的节点处理,不能保证顺序性。如果设置W < K,Dynamo不保证顺序性。

问题4:冲突解决后是否需要将结果值更新存储节点?

读操作解决冲突后不需要将结果值更新存储节点。产生冲突的情况一般有机器下线或者多个客户端导致的顺序问题。机器下线时retry_list中的操作将丢失,某些节点不能获取所有的更新操作。对于机器暂时性或者永久性的异常,Dynamo中内部都有同步机制进行处理,但是对于retry_list中的操作丢失或者多个客户端引发的顺序问题,Dynamo内部根本无法分辨数据是否正确。唯一的冲突解决机器在读操作,Dynamo可以设计成读操作将冲突解决结果值更新存储节点,但是这样会使读操作变得复杂和不高效。所以,比较好的做法是每个写操作都带上读操作返回的多个版本数据,写操作将冲突处理的结果更新存储节点。

posted @ 2009-12-04 23:05 Programmers 阅读(1017) | 评论 (0)编辑 收藏