Carl Hu

Benjamin Rousseau

Rick Tompkins

Gus Bjorklund



Several researchers have proposed cluster based replication solutions that provide fault tolerance, scalability, and data consistency. Although these solutions have the potential to address the scalability and availability needs of many enterprise applications, they often require the customization of databases, the use of non-standard database client library implementations, or the instrumentation of applications.

In this paper, we present a system that implements update anywhere replication of autonomous heterogeneous databases designed to operate with unmodified applications, standard database access library implementations, and off-the-shelf database software. The system, used in the commercial database federation product DataXtend, is fully implemented and has been tested with Oracle, SQL Server, ObjectStore, and a variety of JDBC and ODBC library implementations.

The system maintains the safety property in the face of any combination of process failures and network partitions, is available when quorum is attainable, uses persistent total order to incrementally recover failed nodes, implements the online ability to add and remove nodes to and from a replicating group, and provides transactional data consistency.  

The paper also discusses a spectrum of optimizations available when a deployment admits application instrumentation, database client library flexibility, or database customization.

1          Introduction

Substituting a cluster of replicated database nodes for a single database node remains an important approach for increasing the scalability and availability of enterprise databases.

Gray et al. [31] categorized solutions to the replication problem by two parameters: update propagation strategy and update location flexibility. They discuss two propagation strategies: eager, where updates are applied at all replicas as part of the originating transaction; and lazy, where updates are propagated to other replicas asynchronously after the originating transaction commits. They describe two types of location restrictions: update anywhere, where updates to an object can originate from any replica; and single master, where only an object’s designated master node can originate an update to that object. They conclude that eager solutions do not scale.

Subsequent research proposed new replication solutions that mitigate the severity of these limitations by using hybrid propagation strategies that combine the advantages of eager and lazy propagation [8,10,11,17,14,18,32,34,35,48,49]. These solutions are lazy in that the commit is returned to the client before the update has been committed at all replicas. They are eager in that they guarantee 1-copy data consistency by requiring some transaction coordination prior to commit. The overhead for this coordination, although significant, is considerably cheaper than two phase commit protocols usually used to implement eager replication. Performance studies demonstrated the scalability of these solutions for cluster based database replication [14,18,32,34]. Our system belongs in this class of solutions.

Beyond scalability, enterprise applications may impose a variety of stringent requirements on replication solutions. Simultaneously meeting all of these requirements in a single replication solution has proven challenging. Proposed solutions may not fully account for or tolerate process failures [10,11,35,39,49] and network partitions [10,11,14,32,39,48,49]. Some solutions require a potentially prohibitively expensive complete re-initialization of the recovering database [14]. Others require the instrumentation of applications [18], customization of database implementations [10,17,39], or the use of custom database access client libraries [14,18,22,24,30,34,41,43]. In many enterprise scenarios, entrenched legacy applications and database deployments make these changes difficult. Some solutions require that all database instances in the cluster use exactly identical implementations [8,10,11,14,17,18,22,24,25,32,34,35,39], thus disallowing incremental upgrades of clusters and heterogeneous database operation. In addition, few of these solutions have been validated by full implementations.

In this paper, we present the architecture and design of a fully implemented replication system that exploits the scalability advantages of a hybrid propagation scheme while simultaneously addressing all of the issues enumerated above. In our effort to build this system, we have leveraged a variety of prior research ideas. We share our experience in implementing, integrating, and augmenting these ideas to synthesize what we believe is an attractive replication system for a range of enterprise applications.

The remainder of the paper is organized as follows: Section 2 introduces our system and failure models; Section 3 defines the requirements that our replication solution was designed to meet; Section 4 presents, at an abstract level, our replication solution; Sections 5-10 present our implementation’s architecture and design; and Section 11 concludes the paper.

2          Preliminaries

2.1         System model

A cluster consists of a set of nodes, N = {N1,…,Nn} connected by a network. Each node Ni is executing an ACID compliant autonomous DBMS instance Di and has stable storage. Each Di holds a copy of the entire database. For each Di, there is a set of applications concurrently submitting transactions to Di. As well, each Di is associated with an instance Ri of the replication engine.

The set of Ri forms a persistent membership R = { R1,…,Rn}. R is initially fixed, known in advance, and stored at each Ri on stable storage. We later discuss in section X.X the algorithm for dynamically adding and removing nodes from R.

Each application submits transactions to a database instance using the client library of its choice (e.g. JDBC, ODBC, JDO, etc). Each database instance communicates with its associated replication engine instance via an exactly-once-in-order (EOIO) communications link. Each replication engine instance communicates to all other reachable replication engine instances.

2.2         Failure model

Any of the nodes may fail independently of any other nodes. In addition, any particular application process, its communications to its database process, or the database process itself may each independently fail. As well, the replication engine process or its link to the database process may also each independently fail. We use timeout as a last resort failure detector for these EOIO links. Our algorithms do not require symmetrical (eg. reliable) failure detection for correctness.  

For communications between replication engine instances, we employ the services of a group communications system (GCS) that provides view-synchronous atomic totally ordered message delivery (or abcast) [21,46].

The network may partition into a finite number of disconnected components. Nodes situated in different components cannot exchange messages, while those situated in the same component can continue communicating. Two or more components may subsequently merge to form a larger component.

The GCS provides a membership notification service, informing the replication engine about other replication engine instances that can be reached in the current component. The notification occurs each time a connectivity change or a replication engine crash or recovery occurs.

We assume that no message corruption and no Byzantine faults occur. A node that crashes may subsequently recover. Upon recovery, a node retains its stable storage. 

3          The requirements

We define the problem—transparent, fault tolerant, and consistent replication of arbitrary autonomous heterogeneous databasesin more detail to better characterize the strengths and limitations of our solution.

We are given an existing deployment that includes a set of applications A = {A1,…,An} submitting transactions against a database instance D1 running on node N1. To increase scalability and availability, we wish to substitute D1 with a cluster of database instances D = {D1,…,Dn} and replication engine instances R = {R1,…,Rn} running on a set of nodes N = {N1,…,Nn}.

3.1         Transparency

Our solution must achieve this substitution without modifying existing application code, substituting deployed database client libraries (e.g. ODBC or JDBC) with customized libraries, or modifying the DBMS source code.

We note that it may still be necessary to deploy new nodes {N2,…,Nn}; install and configure new database instances {D2,…,Dn} and replication engine instances {R1,…,Rn}; or install a router to direct database requests from the existing applications A to the new databases in D. These tasks, though onerous, are often more easily accomplished than the modifications of existing systems enumerated above.

3.2         Fault tolerance

In terms of correctness (e.g. safety), our solution must maintain all data consistency guarantees in the face of any combination of database, application, or middleware process failures and any combination of communications partitioning between the above components.

In terms of availability (e.g. liveness), our solution must enable database forward transaction processing without requiring the healing of all faults. When a partition or process heals, our system must be able to bring recovering databases online without requiring complete re-initialization.

3.3         Data consistency

For unmodified existing applications to operate correctly on a replicated database cluster, our replication solution must provide 1-copy data consistency [14,29,50].

3.4         Minimal requirements from databases

The maturation of the database industry has resulted in the widespread implementation of a variety of functionality. Most databases today feature ACID transaction guarantees, efficient data access libraries (e.g. JDBC, ODBC, JDO), query facilities (e.g. SQL, OQL), and trigger and stored procedure functionality (e.g. PL/SQL for Oracle; T-SQL for Microsoft SQL Server; and SQL 2003 for MySQL and IBM DB2) .

On the other hand, today’s widely deployed database and data access library implementations do not yet, for example, include facilities to defer executing a transaction’s updates until commit for distributed total ordering; a requirement of scalable replication solutions that use a replicated database state machine approach [49].

In order to enable support for widely deployed database and data access library implementations, our replication solution must require only commonly implemented functionality.

3.5         Database heterogeneity

Our replication solution was designed for a database federation product that requires supporting a variety of database implementations within the same cluster. Although heterogeneity is less fundamental to basic replication, we nevertheless observe that requiring completely identical software at all nodes makes replication fragile.

As deployments evolve and machines are retired or added, software may be installed or upgraded. Relatively minor changes from version to version and from machine to machine can alter behaviour such as deadlock victim choice or transaction serialization order.

Our replication solution must not be susceptible to these and similar variations.

3.6         Dynamic membership

Given the availability requirements of deployments that require replication, our replication solution must permit the addition and removal of databases to the cluster membership without needing to take the cluster offline.

4          The approach

For clarity and concision, we present our approach as an evolution and integration of prior work. We name and reference these previously published algorithms using provisional abbreviations.

4.1         Replicated state machines

Our solution belongs to a class of replication algorithms that use the replicated state machine approach (RSMA): if a cluster of identical deterministic state machines begin with identical states and are submitted an identical sequence of inputs, then the state machines will transition through the same states and are said to be consistently replicated [8].

Unlike a synchronous replication approach like two phase commit (2PC), RSMA permits the state machines to transition asynchronously with respect to each other. This relaxation reduces the coordination problem from 2PC to the calculation of a totally ordered sequence of messages. This calculation, not requiring end-to-end acknowledgements, is significantly more scalable than 2PC variants [21]. Group communications systems that leverage the properties of networking hardware to solve this problem efficiently are available [21,46]. We leverage these implementations.

4.2          Replicated database state machines

Deriving a simple replicated database state machine approach (RDSMAsymmetrical), from RSMA is straightforward if we substitute a database for each state machine and substitute a database query (which we define as a transaction scoped sequence of reads and writes) for each input. If we ensure that the databases begin with identical data and submitted the same sequence of queries, then the databases are replicated with 1-copy serializable data consistency [49].

Unfortunately, a solution directly based on this approach does not meet our scalability requirement: since all queries are executed by every database, RDSMAsymmetrical clearly offers no greater scalability than that of a single database.

4.3         RDSMAwriteset: improving the scalability of RDSMAsymmetrical

To mitigate this scalability limitation, RDSMAwriteset submits each query to only one originating database, the changes generated by the query are collected into a writeset, and only the writeset is submitted to non-originating databases.

If the cost of applying writesets is lower than the cost of executing queries, then an RDSMAwriteset cluster scales better than a single database. The greater this cost difference, the greater the scalability. We leverage this approach in our solution to gain scalability.

Unfortunately, a solution directly based on RDSMAwriteset does not meet our data consistency requirement: the introduction of asymmetrical inputs to the state machines results in a loss of 1-copy serializability: for example, it is clear that the lost update anomaly [50] occurs should two databases concurrently increment the same variable.

4.4         RDSMAcert: extending RDSMAwriteset to provide data consistency

[49] presents an approach, which we refer to as RDSMAcert, that submits each query to only one originating database, defers the commit of updates generated by that query until certification, collects all updates generated by that query into a writeset, and broadcasts the writeset to a GCS. The GCS delivers the writeset in total order to all nodes, and the writeset is tested against a certification criteria. The certification is designed such that if a transaction fails certification at one node, it fails certification at all nodes. If the certification succeeds, the transaction is locally committed.

A variety of certification schemes have been proposed with varying performance and consistency properties. The following scheme, which we use, guarantees snapshot isolation: Two operations are defined to conflict if they are issued by two different transactions and update the same data. A transaction ti fails certification if any concurrent tj has conflicting operations. Transaction concurrency detection is achieved without message overhead by maintaining Lamport time stamps to verify precedence criteria [49].

4.5         RDSMAmiddle: a practical extension of RDSMAwriteset to provide data consistency

 [14] presents a practical, performant, and scalable realization of this approach by implementing the update deferral, total ordering, and transaction certification in a customized database access library (JDBC). We call their scheme RDSMAmiddle.

Although addressing all of our scalability and consistency requirements, RDSMAmiddle fails to meet some of our fault tolerance and transparency requirements: RDSMAmiddle is intolerant of network partitions, requires complete re-initialization of recovering nodes, and requires applications to use a customized data access library.

4.6         Implementing fault tolerance using persistent membership and total persisted order

For each query execution in RDSMAmiddle, the custom data access library at the originating database collects the writeset and uses a GCS to broadcast the writeset to all databases in the cluster.

A replica that was part of the cluster previously but not currently unavailable will not receive the writeset. To address this problem, RDSMAmiddle requires that each recovering database is initialized with the complete contents of the database on recovery.  

An approach that enables recovering databases to be incrementally initialized involves defining a persistent membership that specifies all replicas in the cluster. Each replica (or member) is persistently aware of all other replicas.

While a virtual synchrony GCS guarantees the delivery of messages in total order with respect to a view ( the set of currently reachable members), a total persisted order message delivery service (TPOS) guarantees delivery of messages in total order to each member in the persistent membership (often implemented using store and forward semantics).

[17] presents a specification of the TPOS as well as an efficient and fault tolerant implementation of TPOS (TPOSimp). Although [17] discusses TPOS only in the context of making RDSMAsymmetrical tolerant of network partitions, the TPOS facility is independently and generally applicable to a wide variety of problems.

From an architectural point of view, we abstract TPOSimp to a component with one input and one output API:

-    SendMessage(Message) and

-    OnRcvMessage(Message).

One must call SendMessage from within the context of a transaction. TPOSimp guarantees that on return from the transaction commit that the message is persisted durably and that we are guaranteed, given eventual path connectivity [17], that the message eventually reaches all members in total persisted order.

4.7         RDSMAReliableMiddle: extending RDSMAmiddle for fault tolerance

To extend RDSMAmiddle to meet our fault tolerance requirements, we integrate TPOSimp into a custom implementation of a database access library (e.g. JDBC). TPOSimp incorporates an embedded ACID database system (we use PSEPro) and a GCS (we use Spread). We replace the GCS broadcasts in RDSMAmiddle with TPOS broadcasts and keep the certification logic unchanged.

In addition, we remove the fault recovery logic in RDSMAmiddle. Since TPOS guarantees that all nodes receive all broadcasted writesets in total persisted order when the node re-joins a quorum, RDSMAReliableMiddle implements incremental recovery.

Our tests demonstrate that the additional overhead associated with local transactional persistence is acceptable given the modern performance of disks and a write ahead logging implementation.

The resulting replication system guarantees snapshot isolation, is performant, and tolerates faults. However, the need to use a custom database client library makes this approach difficult to deploy in some scenarios. We present RDSMlock to address this limitation.

4.8         RDSMlock: extending RDSMAcert for transparency.

4.8.1      The specification style.

For conciseness, we follow the presentation of [10,39] and characterize our replication scheme by the invariants (or rules) maintained when applications read, write, and commit on the originating node and when the writeset is delivered.

The following four rules characterize Read-Once-Write-All [50] RDSMAcert-based implementations:

Read Rule: A read operation ri[x] of a transaction Ti is executed locally by acquiring a local read lock at the initiating site of transaction Ti. Ti remains blocked until the local read lock of x is granted at the initiating database.

Write Rule: A write operation wi[x] of a transaction Ti is executed by performing the update at the initiating site. The update should be made available to construct a writeset later.

Commit Rule: When the initiating client decides to commit the transaction Ti, the commit is blocked and the writeset is retrieved and broadcast using TPOS to all members Wi.

Reconciliation Rule: When a site receives Wi, Wi is tested against certification criteria. If it succeeds, the writeset is applied. Otherwise, the writeset is aborted. In addition, if the site is the initiator, the blocked commit request returns with the appropriate result.

In terms of how readily these rules are implemented by existing implementations: the Read Rule is satisfied by any 2PL database; the Write Rule can be implemented in several ways including installing triggers that capture the change to a table or scanning the transaction log.

The Commit Rule, however, requires a delay of the client’s transaction commit for the required and non-standard distributed total ordering and certification to occur. We have not yet discovered a way of satisfying this requirement when client libraries and databases cannot be customized.

4.8.2      Conflict classes.

We use a pessimistic concurrency scheme to construct a replication scheme with rules that are more easily satisfied by standard database and client library implementations.

We partition the data into a disjoint set of conflict classes C = {C1,…,Cn}. The granularity of a class may range from a database, a table, to a tuple. During design time, our system derives the schematic structure of the replicated databases our system and defaults to row-based granularity for relational databases and object-based granularity for object-oriented databases.  

To leverage these conflict classes, we build and integrate into the system a fault tolerant distributed lock management service (FTDLM). The FTDLM at the originating site is informed of all transaction begins, writes, and commits. With this information, the FTDLM is able to guarantee classic two phase locking semantics.

While two phase locking is usually associated with eager replication, we introduce a way of using 2PL in lazy replication while tolerating process and network failures.

4.8.3      Locking lazy replication.

The Read Rule:

(a)     A read operation ri[x] of a transaction Ti is executed locally by acquiring a local read lock at the initiating site of transaction Ti. Ti remains blocked until the local read lock of x is granted at the initiating database.

This requirement is satisfied by many DBMS implementations.

The Write Rule:

(a)     A write operation wi[x] of a transaction Ti on initiating site I is executed by first acquiring a distributed exclusive lock for x. If the lock is currently owned by another site, then wi[x] is delayed until ownership of the distributed exclusive lock for x is granted to site I.

(b)    When this is successfully acquired, the local write lock for x must be acquired. If this is currently held by another transaction, then wi[x] is delayed until this local lock is granted.

(c)     The write is then performed and the change is captured in a change log.

In terms of implementation, Part (a) of this rule is not immediately satisfied by some DBMS implementations. Satisfaction of this requirement may nevertheless be achieved by incorporating a “lock detector” associated with the database using available customization facilities exposed by DBMS implementations (like update triggers). This lock detector can then use the FTDLM to acquire the distributed exclusive lock before the DBMS performs the write.

Part (b) of this Write Rule is immediately satisfied by many DBMS implementations.

Part (c) of this Write Rule requires that a persistent record of wi[x], L[wi[x]] is atomically recorded, in order, along with wi[x] in such a way that L[wi[x]] is available for later read access as described in the Commit Rule. We discuss two ways this can achieved by typical DBMS implementations. One, many DBMS implementations use a log to maintain durability and record L[wi[x]] with the required atomicity. If this can be accessed as described in the Commit Rule, then this requirement is satisfied. Secondly, we can use available customization facilities provided by the DBMS implementation to capture each wi[x] into a transactionally persistent data structure (like a table) for subsequent access. We call any of these possibilities change capture mechanisms and the data structure that contains the captured changes the change log.

The Commit Rule:

(a)     When an initiating site I decides to commit a transaction Ti, the database transaction is immediately committed.

(b)    After the commit succeeds, a transaction tk is asynchronously begun and executes the following tasks: All changes associated with transaction Ti are retrieved from the committed change log to form a writeset. The writeset preserves ordering. We call this process change retrieval. Once the writeset is retrieved, the change set is decorated with an identifier specifying the initiating site I. This decorated change set is then given to TPOS, which uses a total order broadcast primitive to guarantee delivery of the change set to all sites (including I). The change log is then modified in such a way as to guarantee that changes that have been retrieved are not retrieved again.

(c)     Upon the successful commit of tk, all distributed locks associated with Ti are released. If messages need to be sent, the message is sent over the same GCS totally ordered view as the writeset broadcast.

Part (a) of this Commit Rule is immediately satisfied by most DBMS implementations.

Part (b) of this Commit Rule is not immediately satisfied by most DBMS implementations. However, it is straightforward to augment these implementations to satisfy part (b). In the Replicator module described later in this document, this is achieved by having an external process, woken by a database commit begin a transaction and query the database to retrieve the changes from the change log, form a change set and set the initiator identifier for the change set, and give it to the TPOS.

Part (c) involves the release of the distributed locks acquired in the course of executing transaction Ti. In the design of the Replicator described later in this document, we discuss how this is achieved by informing the FTDLM that the transaction has committed via the TransactionCompleted notification. The FTDLM may, if there are other sites requesting the distributed lock, broadcast messages to other sites using the total order broadcast primitive in order to transfer ownership of the distributed lock. This also guarantees that the lock transfer is delivered to all sites after the change set is delivered, a critical invariant for maintaining data consistency.

The Reconciliation Rule:

(a)     At each site, the TPOS delivers change sets and the GCS delivers lock management messages.

(b)    If the delivered message to site R is a writeset, then site R extracts from the change set the initiator site I. If I is different from R, R applies each change in the change set to the database in order. If R is the same as I, then the message is dropped. If the delivered message to site R is a lock ownership transfer, then this message is processed by the lock management system. Change sets and lock transfers must be processed in the order that they are delivered.

Augmenting typical DBMS implementations with this logic typically poses few problems. We use the TPOSM as the persistent total ordering mechanism and the FTDLM to implement 2PL. They both use the same broadcast primitive for communication with other sites. This ensures that the ordering between lock management and change set delivery is preserved (part 4 of this rule). We incorporate these elements in an external process that receives the delivered messages from the broadcast primitive. In addition, we implement logic needed to reconcile the received change sets to the database by using available database query mechanisms. 

4.9         The solution

Data locality and choice of replication scheme

4.10      RDSMhybrid: Using both RDSMlock  and RDSMAReliableMiddle in a single cluster

To summarize our solution, our solution fundamentally uses the RDSMhybrid protocol, which is a hybrid of two replication protocols: RDSMlock and RDSMAReliableMiddle. Both protocols use hybrid propagation schemes that broadcast updates after the originating transaction commits. RDSMlock uses a pessimistic locking scheme whereas RDSMAReliableMiddle uses an optimistic concurrency scheme.

mean by dach transaction accesses a set of conflict classes. Distributed mutual exclusion guarantees that each conflict class is associated with a single master node at any moment. Each node uses either RDSMlock or RDSMAReliableMiddle. Therefore, each conflict class is associated with either RDSMlock or RDSMAReliableMiddle.

The protocol observes that if all conflict classes accessed by a transaction are associated to RDSMAReliableMiddle, then commit of the transaction can be processed by RDSMAReliableMiddle. Otherwise, the transfer of the conflict class to RDSMAReliableMiddle is initiated and the in flight transaction is aborted. RDSMlock works without modification.

We illustrate with an example. Suppose a transaction begins on an RDSMAReliableMiddle node and that transaction updates object A. If the current ownership of A is owned by an RDSMAReliableMiddle node (not necessarily this one), then the transaction executes using the certification algorithm. Otherwise, the local FTDLM requests the lock and aborts this transaction.


5          Implementation architecture

5.1         Overview

In this section, we describe how we have built this functionality.

In our implementation, we encapsulate the majority of the logic in a module called the Replicator Engine (or just Replicator). Each instance of the Replicator executes identical logic. However, correct operation of the logic requires certain configuration information unique to each instance: each Replicator instance must be configured to be associated with its own database instance as well as its own unique identifier. A particular instance of the Replicator configured in this manner is termed a member.

Any member belongs to a coordination group. The Replicator replicates the data in the databases of all members that are in the same coordination group.

In addition to the associated unique identifier and database, each instance of the Replicator is configured to locally store a persistent membership list containing the unique identifiers of all of the members of the coordination group. The contents of the membership lists associated with each member belonging to the same coordination group are identical. We use a dynamic membership protocol to consistently maintain this list, described later.

5.2         External Interface

The basic Replicator Engine exposes a user callable interface via a concise API consisting of three functions.  An application update detector may be utilized which automatically makes calls these functions on behalf of an application.


5.2.1      TransactionID BeginTransaction()

This call does not involve network operations and therefore executes efficiently. This call is allowed to succeed even when the local member is not part of a quorum. This logic is convenient in use cases where clients acquire exclusive locks when updating data but not when reading data. In this case, this logic allows clients to begin and commit read-only transactions even when not connected to quorum.

5.2.2      Lock(TransactionID, NameOfResource)

The Replicator Engine guarantees that the lock is exclusively acquired and the local replica contains up to date data if this function returns successfully. The following pseudocode specifies the precise behavior of this function. The logic is designed such that: repeated acquisitions of the same lock do not require network messages; transactions that update data when the local instance does not have quorum are aborted; multiple transactions that require the same resource are allowed to execute concurrently locally (with correctness guaranteed by the local database lock management system); and fairness is guaranteed even when a greedy node never lets go of the resource.

Performance is improved by implementing a “quantum” whereby N transactions are allowed to execute for each lock transfer.

If the function throws, the user of this function should undo any actions (for example, call Rollback() on his database transaction).


Lock(TransactionID, Name):

if this server is not in quorum

  throw exception;

if this server owns Name and

  there are waiters for resource and

  number of txns > quantum then

    release Name

if this server does not own Name then

  request Name

  block until acquired

update transaction lists


5.2.3      TransactionCompleted(TransactionID)

Calling this function notifies the Replicator Engine that it can release all locks acquired in the transaction.  As discussed earlier, the Replicator Engine does not assist the user in executing or persisting operations but merely implements the 2-phase locking semantic. In other words, exclusive ownership of a resource is acquired and guaranteed to be maintained from the time the call to Lock() returns to the time that TransactionCompleted() is called.

5.3         Synopsis of components

In order to understand the roles of the various components, consider the replication of a single user transaction.  The user transaction informs the Replicator of the conflict classes it wishes to update, updates those classes when allowed to proceed, and commits.  The Fault Tolerant Distributed Lock Manager (FTDLM) processes the associated transaction notifications: begin, acquire lock, and commit.  The source of notifications can be either the application directly, or some detector monitoring application transactions.  Once the user transaction has committed, the FTDLM asks the Change Capture Retriever (CCR) to retrieve the committed changes from the database.  The CCR transforms changes in the local database format to a neutral format by abstracting away differences in database structure, schema and query language.  Once the CCR has retrieved changes, the Total Persisted Order State Machine (TPOSM) sends the changes to the other members of the coordination group according to a modified version of the method described in [17].  If locks can be released the lock request messages are sent after the changes are propagated.  Upon receipt of changes, the TPOSM calls the Reconciler to apply the changes to the local database in the local format.  Of course, changes originating with the local database are not applied.

Coordination between the TPOSM and the FTDLM is handled by the Replicator Coordinator State Machine (RCSM), which integrates their operation to implement the Read, Write, Commit, and Reconciliation Rules of the replication method.

Both the FTDLM and the TPOSM utilize the EVS transport, which provides totally ordered FIFO delivery.  Therefore, causal order between lock request, change set propagation, and lock release is preserved.  The Router multiplexes inputs and outputs between the FTDLM, the TPOSM, and the EVS transport, allowing the two components to share access to the transport.

Figure X depicts the events that the various state machines use to interact with each other to implement the required behaviour of the Replicator.

5.4         Change Capture Retriever implementation.

The CCR implementation is responsible for obtaining committed changes stored in the local database and presenting them to the RCSM.  The method of extraction and source of changes is dependent on the database implementation, but could be the accessing of the transaction log or reading of a table maintained by triggers.

5.4.1      Output

The CCR maintains a queue of retrieval requests from the RCSM.  As each request is fulfilled the CCR signals AllChangesReceived to the RCSM with the change sets and the corresponding request.  Change set retrieval is a significant performance cost, which can be mitigated if multiple requests can be fulfilled at the same time.  Because a CCR retrieval cycle will obtain all changes since the last retrieval cycle, a single cycle can be used to fulfil multiple requests, as long as at least one CCR retrieval cycle occurs between the latest request and signalling the RCSM.

5.4.2      Input

CCR requests occur after the RCSM realizes that a transaction has committed.  If the FTDLM must release locks upon commit of the transaction, then an ForceRetrieveChangeSets request is issued by the RCSM; otherwise a LazyRetrieveChangeSets request is issued.  A ForceRetrieveChangeSets request must be fulfilled as soon as possible since the lock release message cannot be sent until the changes are propagated by sending them to the TPOSM.  LazyRetrieveChangeSets requests can be fulfilled asynchronously, significantly improving throughput at the cost of latency.

5.5         The Total Persisted Order State Machine Implementation

The TPOSM in the Replicator Module is based on the method described in [17,15].  The salient feature of the method in [17] is that all updates are applied in the same total persisted order everywhere; thus all replicas contain identical data.  In [17] updates are applied upon receipt; local commits are delayed until updates are received.  In our system, local commits occur prior to propagating updates, but the application order of updates on conflict classes are the same everywhere. We also implemented the dynamic membership method found in [17].

5.5.1      Removal of historical data

Removal of historical data occurs in [17,15] by adding information identifying the last update applied with every update sent.  When an update has been applied on every member, it can be deleted.  If a quorum is long lasting and contains members which never send updates, no last applied update information is sent, and historical data can accumulate without bound.  We force members which do not have any updates to send an empty update occasionally so that historical data can be removed.

5.5.2      Flow control

The communication system cannot be considered an infinite sink and has limited resources.  Members that do not retrieve messages from the communication system quickly enough appear unresponsive and are dropped from the view, thereby forcing a GCS view change and a recovery cycle.  If some members are significantly out of date during the recovery cycle, many updates will be sent, which could take a significant amount of time to apply, thereby making the member appear unresponsive.  We avoid this behaviour by implementing flow control based on the memory size of the retransmission updates.

5.5.3      Input

SendMessage is used by the RCSM to send a change set to deliver to all other RCSM instances in the coordination group. The message must be delivered guaranteeing total persisted order. The call is made in the context of a transaction and is made in the same transaction as the change retrieval executed by the CCR.  ReceivedMessage is used by the Router to notify the TPOSM of reception of an action from the EVS Transport. 

5.5.4      Output

OnMessage notifies the RCSM of the next action in total persisted order to reconcile.  The message is delivered in the context of a transaction and provides both the action itself and the relative total order of the action.

5.6         The Replication Coordination State Machine Implementation

The RCSM coordinates among the CCR, the TPOS, the FTDLM, and the Reconciler.  When the FTDLM determines that a lock must be released, it notifies the RCSM with the ReleaseLockRequested event; the RCSM then submits a CCR ForceRetrieveChangeSets request.  When the CCR has fulfilled this request, the RCSM passes the resulting change sets to the TPOSM, which propagates them to all connected members.  Upon propagation, the RCSM notifies the FTDLM that it can release the lock by calling ProceedWithLockRelease, which allows the FTDLM to cause a lock release message to be sent. The TPOSM notifies the RCSM of actions that must be reconciled by calling OnMessage.

5.7         The Fault Tolerant Distributed Lock Manager Implementation

5.7.1      Requirements

A member M acquires locks on conflict classes C = {C1,…,Cn} on behalf of transactions = {T1,…,Tn}.  A transaction may use several conflict classes. Each member executes a replicated state machine [8] containing the global lock state.  In the absence of process and network failures, the lock state is identical at each member.  After process or network failures members with stale or missing lock state obtain up to date lock state from surviving members.

Mutual exclusion is preserved through a replicated queue of lock waiters: the head of the queue is the current owner   and the queue changes via receipt of lock request and lock release messages.  Members do not send lock release messages until applications have finished updating associated conflict classes [1].  Locks are initially acquired according to total reception order of lock request messages. 

Multiple resource mutual exclusion is enforced by the two phase locking protocol.  A member M serially acquires all the locks needed by an application A prior to allowing the A to proceed.  Locks are acquired utilizing the single resource mutual exclusion protocol.  Once A is finished, the locks are released. [2]

Without network and process failures the per server lock state is always up to date; therefore it is always correct and mutual exclusion is always preserved [8].  Network and process failures create stale or missing lock state which can lead to concurrent modification in violation of mutual exclusion.

The mutual exclusion methods outlined in 4.2 and 4.3 are vulnerable to network partitions.  Unowned locks may be concurrently acquired in disconnected network partitions.  Owned locks are safe because only the owner can release them.  Therefore, the process of granting initial locks must employ mutual exclusion among partitions.  Majority weighting is used to choose one component to be the quorum component [3,4].

Utilization of quorum components is not sufficient to avoid mutual exclusion violation in the presence of process failures.  In addition, the lock state in the quorum component must always reflect the latest lock state; otherwise locks held by crashed servers can be acquired.  Therefore, quorums must be intersecting [5] and lock state from the previous quorum must be present.

5.7.2      Overview

The FTDLM is composed of three component state machines: the Quorum State Machine (QSM), the Lock Manager State Machine (LMSM), and the Lock Queue State Manager (LQSM).  The LQSM manages lock state for a single conflict class: thus there are as many LQSM instances as conflict classes.  When not in quorum, the LMSM causes all new transactions and lock requests to abort.  Determination of quorum and maintenance of the latest lock state during process and network failures is handled by the QSM.

5.7.3      Lock Queue State Machine

The LQSM contains a queue of member unique identifiers where the head of the queue is the current owner; other members are FIFO ordered waiters for a lock on the conflict class [1].  The existence of process failures requires durable storage of current lock ownership so that upon recovery the latest lock ownership is available. 

The LQSM receives lock requests and transaction commit notifications from the LMSM via the OnLock method and OnTransactionCompleted.  Lock request and release messages are received via OnRcvEnqueue and OnRcvDequeue.  After a process or network failure, the QSM notifies each LQSM of the latest lock state by calling OnReset.



if this member is not queue then


wait for this member to reach head of queue

update transaction lists



update transaction lists

if no transaction is using this lock then




add member to queue

if the head of the queue changed then

  persist the new owner

if the head of the queue is this member then

  signal waiting transactions



remove member from queue

if the head of the queue changed then

  persist the new owner

if this member is now the head of the queue

  signal waiting transactions

5.7.4      Quorum State Machine Implementation

The QSM is responsible for ensuring that each quorum starts with the latest lock state.  The QSM is also responsible for preventing quorum when not enough members are present.

The EVS transport informs the QSM of view changes by calling OnRcvRegConf and OnRcvTransConf.  This causes the QSM to first call OnEnteringNonPrimary and then send lock state to members of the view by calling SendLockMgrState.  The lock state consists of the members, the version number, and the lock queues at the end of the last quorum the member has knowledge of.  Recovering members have no transient state and only send persistent lock and quorum state.  The QSM receives lock state message from all members of the view via OnRcvLockMgrState.  With the lock state messages from all members, the QSM can determine the latest quorum and lock state, which it then persists in order to propagate latest quorum and lock state information via eventual path [17]. With the latest quorum the QSM determines whether a new quorum can be formed.  If so, the QSM informs each LQSM of the current latest lock state by calling OnReset, and calls OnEnteringPrimary.

Quorum is attained when a weighted majority of the members in the latest quorum are present in the current GCS view [4].  If no member has transient lock state, the initial lock state for the quorum consists only of lock owners, satisfying the requirement that the quorum always begin with the latest lock state.

5.8         Critical performance enhancements.

The simple implementation suffers from poor performance due to excessive access of durable storage, unnecessary multiple lock requests, and starvation.  Because lock ownership is persisted upon every ownership change, potentially every transaction initiated at the local member will one or more durable storage requests.  Starvation can exist when several applications utilizing a single member continually use the same conflict class.  In the presence of significant contention the overhead of lock transferral can become significant.

5.8.1      Zero data persistence during forward processing.

Unless all members of a view fail simultaneously every quorum is survived by at least one member.  Because all members of the quorum received the same lock messages, they all contain the same lock queue state.  Instead of persisting every change in lock ownership, the ownership at the end of the quorum is persisted by surviving members, as it is equivalent to persisting the lock ownership upon every change.  This durable lock ownership is called the lock slice, and includes the quorum version number from which the lock ownership was determined: this is sufficient to determine the latest persistent lock state. 

When all members fail simultaneously, there are no surviving members to persist the lock slice.  Therefore, no latest lock ownership is available upon recovery.  However, some stale version of the lock slice may be available, and using this version violates the locking invariants  In order to detect this case, the QSM compares the latest quorum version of the lock state (transient or persistent) among the available members with that of the last quorum.  If the lock state version does not match the quorum version, then the QSM knows no member present in the current view survived the ending of the last quorum, and that no suitable lock slice is available to initialize lock state.  The QSM must force all members of the last quorum to be present before forming a new quorum in order to ensure that any unpropagated changes can be discovered.

5.8.2      Lock caching.

Data access patterns in many applications demonstrate a locality of data access: data accessed once is likely to be accessed again.  Rather than releasing locks as soon as possible, lock acquisition overhead is reduced by only releasing locks upon commit if there is a waiter for the conflict class; otherwise the lock is retained so that further updates on the conflict class can proceed without blocking.

5.8.3      Separating transaction ownership and member ownership

Locks are owned by members and used by transactions. A lock that is owned by a member may be simultaneously used by several transactions on that member.

5.8.4      Quantum.

Greedy members can cause unfair behaviour, where locks are never released because some local transaction is always using the lock.  In order to introduce fairness, a limit on the number of transactions that may run at a member is introduced, called the quantum.  If there are waiters for the conflict class when OnTransactionCompleted is executed, and there are local transactions utilizing the conflict class waiting to execute, then the LQSM only allows a quantum of local transactions to execute using the current lock ownership.  Local transactions above the quantum must wait until the lock is released and reacquired.  This allows other members to update the conflict class.

6          Conclusion

We have presented the architecture and design of a fully implemented replication system that exploits the scalability advantages of a hybrid propagation scheme while meeting transparency and fault tolerance requirements.

7          References

[1]  L. Lamport. Time, clocks and the ordering of events in a distributed system, Comm ACM 21, 7, July 1978, 558-565.

[2]  J. Gray.  Notes on Database Systems. IBM Research Report RJ2188 (Feb. 1978) San Jose, CA.

[3]  D. K. Gifford. Weighted voting for replicated data.  In Proceedings of the Seventh ACM Symposium on Operating Systems Principles (Dec 1979).

[4]  S. Jajodia, D. Mutchler.  Dynamic voting algorithms for maintaining the consistency of a replicated database.  ACM Transactions on Database Systems, 15(2):230-280, 1990.

[5]  D. Agrawal, A. El Abbadi. An efficient and Fault-Tolerant Solution for Distributed Mutual Exclusion.  ACM Transactions on Computer Systems 9(1):1-20,1991.

[6]  N. Kronenberg, H. Levy, W. Strecker. VAXClusters: A Closely-Coupled Distributed System. In ACM TOCS, May 1986.

[7]  M. Carey, M. Livny. Distributed Concurrency Control Performance: A Study of Algorithms, Distribution, and Replication. In VLDB 1988.

[8]  F. Schneider. Implementing fault-tolerant services using the state machine approach: a tutorial. In ACM Computing Surveys, December 1990.

[9]  M. Velazquez. A Survey of Distributed Mutual Exclusion Algorithms. In Colorado State University Technical Report CS-93-116, 1993.

[10]     I. Stanoi, D. Agrawal, A. Abbadi. Using Broadcast Primitives in Replicated Databases. In ICDCS 1998.

[11]     M. Wiessman, F. Pedone, A. Schiper, B. Kemme, G. Alonso. Understanding Replication in Databases and Distributed Systems. In ICDCS 2000.

[12]     X. Defago, A. Schiper, P. Urban. Total Order Broadcast and Multicast Algorithms: Taxonomy and Survey. In ACM Computing Surveys, December 2004.

[13]     S. Lin, Q. Lian, M. Chen, Z. Zhang. A Practical distributed mutual exclusion protocol in dynamic peer-to-peer systems. In IPTPS 2004.

[14]     Y. Lin, B. Kemme, M. Patino-Martinez, R. Jimenez-Peris. Middleware based data replication providing snapshot isolation. In SIGMOD 2005.

[15]     O. Amir, Y. Amir, D. Dolev.  A Highly Available Application in the Transis Environment.  1993.

[16]     A. Adya, B. Liskov, and P. E. O'Neil. Generalized isolation level definitions. In ICDE, 2000.

[17]     Y. Amir and C. Tutu. From Total Order to Database Replication. In Proc. of Int. Conf. on Distr. Comp. Systems (ICDCS), July 2002.

[18]     C. Amza, A. L. Cox, and W. Zwaenepoel. Consistent replication for scaling back-end databases of dynamic content web sites. In Middleware, 2003.

[19]     T. Anderson, Y. Breitbart, H. F. Korth, and A. Wool. Replication, consistency, and practicality: Are these mutually exclusive? In ACM SIGMOD, 1998.

[20]     H. Berenson, P. Bernstein, J. Gray, J. Melton, E. O'Neil, and P. O'Neil. A critique of ANSI SQL isolation levels. In ACM SIGMOD, 1995.

[21]     K.P. Birman. Building Secure and Reliable Network Applications. Prentice Hall, 1996.

[22]     K. Bohm, T. Grabs, U. Rohm, and H.-J. Schek.  Evaluating the coordination overhead of synchronous replica maintenance. In Euro-Par, 2000.

[23]     Y. Breitbart, R. Komondoor, R. Rastogi, S. Seshadri, and A. Silberschatz. Update propagation protocols for replicated databases. In ACM SIGMOD, 1999.

[24]     E. Cecchet, J. Marguerite, and W. Zwaenepoel.  C-JDBC: Flexible database clustering middleware. In USENIX Conference, 2004.

[25]     P. Chundi, D. J. Rosenkratz, and S. S. Ravi. Deferred updates and data placement in distributed databases.  In ICDE, 1996.

[26]     Transaction Processing Performance Council. TPC Benchmark W.

[27]     K. Daudjee and K. Salem. Lazy database replication with ordering guarantees. In ICDE, 2004.

[28]     S. Elnikety, F. Pedone, and W. Zwaenepoel.  Generalized snapshot isolation and a prefix-consistent implementation. Technical report, EPFL, 2004.

[29]     A. Fekete, D. Liarokapis, E. O'Neil, P. O'Neil, and D. Shasha. Making snapshot isolation serializable.

[30]     Foedero Technologies Inc. Foederoreplica 1.0, 2004.

[31]     J. Gray, P. Helland, P. O'Neil, and D. Shasha. The dangers of replication and a solution. In ACM SIGMOD, 1996.

[32]     J. Holliday, D. Agrawal, and A. El Abbadi. The performance of database replication with group communication. In IEEE FTCS, 1999.

[33]     J. Holliday, R. Steinke, D. Agrawal, and A. Abbadi. Epidemic algorithms for replicated databases. TKDE, 15(5), 2003.

[34]     R. Jimenez-Peris, M. Patino-Martinez, B. Kemme, and G. Alonso. Improving the scalability of fault-tolerant database clusters. In ICDCS, 2002.

[35]     B. Kemme and G. Alonso. A new approach to developing and implementing eager database replication protocols. ACM TODS, 25(3), 2000.

[36]     C. Liu, B. G. Lindsay, S. Bourbonnais, E. Hamel, T. C. Truong, and J. Stankiewitz. Capturing global transactions from multiple recovery log files in a partitioned database system. In VLDB, 2003.

[37]     E. Pacitti, P. Minet, and E. Simon. Replica consistency in lazy master replicated databases.  Distributed and Parallel Databases, 9(3), 2001.

[38]     E. Pacitti and E. Simon. Update propagation strategies to improve freshness in lazy master replicated databases. VLDB Journal, 8(3), 2000.

[39]     F. Pedone, R. Guerraoui, and A. Schiper. Exploiting Atomic Broadcast in Replicated DBs. In EuroPar'98.

[40]     K. Petersen, M. Spreitzer, D. B. Terry, M. Theimer, and A. J. Demers. Flexible update propagation for weakly consistent replication. In SOSP, 1997.

[41]     C. Plattner and G. Alonso. Ganymed: Scalable replication for transactional web applications. In Middleware, 2004.

[42]     M. Rabinovich, N. H. Gehani, and A. Kononov.  Scalable update propagation in epidemic replicated databases. In EDBT, 1996.

[43]     U. Rohm, K. Bohm, H-J. Schek, and H. Schuldt. FAS - a freshness-sensitive coordination middleware for a cluster of olap components. In VLDB, 2002.

[44]     R. Schenkel and G. Weikum. Integrating snapshot isolation into transactional federations. In CooPIS'00.

[45]     R. Schenkel, G. Weikum, N. Weissenberg, and X. Wu. Federated transaction management with snapshot isolation. In FMLDO, 1999.

[46]     Spread. Homepage:

[47]     J. Wieck. A replication system for PostgreSQL. White Paper:

[48]     S. Wu and B. Kemme. Postgres-R(SI): Combining replica control with concurrency control based on snapshot isolation. In ICDE, 2005.

[49]     V. Zuikeviciute and F. Pedone: Revisiting the Database State Machine Approach. In SRDS, 2004.

[50]     P. Bernstein, V. Hadzilacos, N. Goodman. Concurrency and Recovery in Database Systems. In Addison-Wesley book, 1987.

[51]     PSEPro. Homepage: