Transparent,
Fault Tolerant, and Consistent Replication of Arbitrary Autonomous Heterogeneous
Databases
Carl Hu
Benjamin Rousseau
Rick Tompkins
Gus Bjorklund
Abstract
Several researchers have
proposed cluster based replication solutions that provide fault tolerance,
scalability, and data consistency. Although these solutions have the potential
to address the scalability and availability needs of many enterprise
applications, they often require the customization of databases, the use of non-standard
database client library implementations, or the instrumentation of applications.
In
this paper, we present a system that implements update anywhere replication of
autonomous heterogeneous databases designed to operate with unmodified applications,
standard database access library implementations, and off-the-shelf database software.
The system, used in the commercial database federation product DataXtend, is fully implemented and has been tested with
Oracle, SQL Server, ObjectStore, and a variety of JDBC
and ODBC library implementations.
The
system maintains the safety property in the face of any combination of process
failures and network partitions, is available when quorum is attainable, uses persistent
total order to incrementally recover failed nodes, implements the online ability
to add and remove nodes to and from a replicating group, and provides transactional
data consistency.
The
paper also discusses a spectrum of optimizations available when a deployment
admits application instrumentation, database client library flexibility, or database
customization.
1
Introduction
Substituting a cluster of replicated database nodes for a single database
node remains an important approach for increasing the scalability and
availability of enterprise databases.
Gray et al. [31] categorized solutions to the replication problem by
two parameters: update propagation strategy and update location flexibility. They
discuss two propagation strategies: eager, where updates are applied at all
replicas as part of the originating transaction; and lazy, where updates are propagated to other replicas asynchronously
after the originating transaction commits. They describe two types of location
restrictions: update anywhere, where
updates to an object can originate from any replica; and single master, where only an object’s designated master node can originate
an update to that object. They conclude that eager solutions do not scale.
Subsequent research proposed new replication solutions that mitigate the
severity of these limitations by using hybrid propagation strategies that
combine the advantages of eager and lazy propagation [8,10,11,17,14,18,32,34,35,48,49].
These solutions are lazy in that the commit is returned to the client before
the update has been committed at all replicas. They are eager in that they
guarantee 1-copy data consistency by requiring some transaction coordination prior
to commit. The overhead for this coordination, although significant, is
considerably cheaper than two phase commit protocols usually used to implement eager
replication. Performance studies demonstrated the scalability of these
solutions for cluster based database replication [14,18,32,34].
Our system belongs in this class of solutions.
Beyond scalability, enterprise applications may impose a variety of stringent
requirements on replication solutions. Simultaneously meeting all of these requirements
in a single replication solution has proven challenging. Proposed solutions may
not fully account for or tolerate process failures [10,11,35,39,49] and network
partitions [10,11,14,32,39,48,49]. Some solutions require a potentially prohibitively
expensive complete re-initialization of the recovering database [14]. Others require
the instrumentation of applications [18], customization of database implementations
[10,17,39], or the use of custom database access client
libraries [14,18,22,24,30,34,41,43]. In many enterprise scenarios, entrenched legacy
applications and database deployments make these changes difficult. Some solutions
require that all database instances in the cluster use exactly identical implementations
[8,10,11,14,17,18,22,24,25,32,34,35,39], thus disallowing incremental upgrades of
clusters and heterogeneous database operation. In addition, few of these
solutions have been validated by full implementations.
In this paper, we present the architecture and design of a fully
implemented replication system that exploits the scalability advantages of a
hybrid propagation scheme while simultaneously addressing all of the issues enumerated
above. In our effort to build this system, we have leveraged a variety of prior
research ideas. We share our experience in implementing, integrating, and
augmenting these ideas to synthesize what we believe is an attractive replication
system for a range of enterprise applications.
The remainder of the paper is organized as follows: Section 2
introduces our system and failure models; Section 3 defines the requirements
that our replication solution was designed to meet; Section 4 presents, at an
abstract level, our replication solution; Sections 5-10 present our
implementation’s architecture and design; and Section 11 concludes the paper.
2
Preliminaries
A cluster consists of a set of nodes, N = {N1,…,Nn}
connected by a network. Each node Ni is executing an ACID compliant autonomous
DBMS instance Di and has stable storage. Each Di holds a
copy of the entire database. For each Di, there is a set of applications
concurrently submitting transactions to Di. As well, each Di
is associated with an instance Ri
of the replication engine.
The set of Ri forms a persistent
membership R = { R1,…,Rn}. R
is initially fixed, known in advance, and stored at each Ri on stable storage. We later discuss in
section X.X the algorithm for dynamically adding and removing nodes from R.
Each application submits transactions to a database instance using the
client library of its choice (e.g. JDBC, ODBC, JDO, etc). Each database instance
communicates with its associated replication engine instance via an
exactly-once-in-order (EOIO) communications link. Each replication engine instance
communicates to all other reachable replication engine instances.
Any of the nodes may fail independently of any other nodes. In
addition, any particular application process, its communications to its database
process, or the database process itself may each independently fail. As well, the
replication engine process or its link to the database process may also each
independently fail. We use timeout as a last resort failure detector for these
EOIO links. Our algorithms do not require symmetrical (eg.
reliable) failure detection for correctness.
For communications between replication engine instances, we employ the
services of a group communications system (GCS) that provides view-synchronous atomic
totally ordered message delivery (or abcast) [21,46].
The network may partition into a finite number of disconnected
components. Nodes situated in different components cannot exchange messages,
while those situated in the same component can continue communicating. Two or
more components may subsequently merge to form a larger component.
The GCS provides a membership notification service, informing the
replication engine about other replication engine instances that can be reached in
the current component. The notification occurs each time a connectivity change
or a replication engine crash or recovery occurs.
We assume that no message corruption and no Byzantine faults occur. A node
that crashes may subsequently recover. Upon recovery, a node retains its stable
storage.
3
The requirements
We define the problem—transparent, fault tolerant, and consistent replication of arbitrary
autonomous heterogeneous databases—in more detail to better characterize
the strengths and limitations of our solution.
We are given an existing deployment that includes a set of applications
A = {A1,…,An} submitting transactions against a database instance
D1 running on node N1. To increase scalability and
availability, we wish to substitute D1 with a cluster of database
instances D = {D1,…,Dn} and replication
engine instances R = {R1,…,Rn} running on a set of nodes N = {N1,…,Nn}.
Our solution must achieve this substitution without modifying existing
application code, substituting deployed database client libraries (e.g. ODBC or
JDBC) with customized libraries, or modifying the DBMS source code.
We note that it may still be necessary to deploy new nodes {N2,…,Nn}; install and configure new database
instances {D2,…,Dn} and replication
engine instances {R1,…,Rn}; or install
a router to direct database requests from the existing applications A to the new databases in D. These tasks, though onerous, are often
more easily accomplished than the modifications of existing systems enumerated
above.
In terms of correctness (e.g. safety), our
solution must maintain all data consistency guarantees in the face of any combination
of database, application, or middleware process failures and any combination of
communications partitioning between the above components.
In terms of availability (e.g. liveness), our
solution must enable database forward transaction processing without requiring
the healing of all faults. When a partition or process heals, our system must be
able to bring recovering databases online without requiring complete re-initialization.
For unmodified existing applications to operate correctly on a
replicated database cluster, our replication solution must provide 1-copy data
consistency [14,29,50].
The maturation of the database industry has resulted in the widespread implementation
of a variety of functionality. Most databases today feature ACID transaction guarantees,
efficient data access libraries (e.g. JDBC, ODBC, JDO), query facilities (e.g.
SQL, OQL), and trigger and stored procedure functionality (e.g. PL/SQL for
Oracle; T-SQL for Microsoft SQL Server; and SQL 2003 for MySQL and IBM DB2) .
On the other hand, today’s widely deployed database and data access
library implementations do not yet, for example, include facilities to defer executing
a transaction’s updates until commit for distributed total ordering; a
requirement of scalable replication solutions that use a replicated database
state machine approach [49].
In order to enable support for widely deployed database and data access
library implementations, our replication solution must require only commonly implemented
functionality.
Our replication solution was designed for a
database federation product that requires supporting a variety of database implementations
within the same cluster. Although heterogeneity is less fundamental to basic replication,
we nevertheless observe that requiring completely identical software at all
nodes makes replication fragile.
As deployments evolve and machines are retired or added, software may be
installed or upgraded. Relatively minor changes from version to version and
from machine to machine can alter behaviour such as deadlock victim choice or transaction
serialization order.
Our replication solution must not be susceptible to these and similar variations.
Given the availability requirements of deployments that require
replication, our replication solution must permit the addition and removal of databases
to the cluster membership without needing to take the cluster offline.
4
The approach
For clarity and concision, we present our approach as an evolution and
integration of prior work. We name and reference these previously published
algorithms using provisional abbreviations.
Our solution belongs to a class of
replication algorithms that use the replicated state machine approach (RSMA): if
a cluster of identical deterministic state machines begin with identical states
and are submitted an identical sequence of inputs, then the state machines will
transition through the same states and are said to be consistently replicated [8].
Unlike a synchronous replication approach like two phase commit (2PC), RSMA
permits the state machines to transition asynchronously with respect to each
other. This relaxation reduces the coordination problem from 2PC to the calculation
of a totally ordered sequence of messages. This calculation, not requiring
end-to-end acknowledgements, is significantly more scalable than 2PC variants [21].
Group communications systems that leverage the properties of networking hardware
to solve this problem efficiently are available [21,46].
We leverage these implementations.
Deriving a simple replicated database state
machine approach (RDSMAsymmetrical), from
RSMA is straightforward if we substitute a database for each state machine and substitute
a database query (which we define as a transaction scoped sequence of reads and
writes) for each input. If we ensure that the databases begin with identical
data and submitted the same sequence of queries, then the databases are
replicated with 1-copy serializable data consistency [49].
Unfortunately, a solution directly based on this approach does not meet
our scalability requirement: since all queries are executed by every database, RDSMAsymmetrical clearly offers no greater scalability
than that of a single database.
To mitigate this scalability limitation, RDSMAwriteset
submits each query to only one originating database, the changes generated by
the query are collected into a writeset,
and only the writeset is submitted to non-originating databases.
If the cost of applying writesets is lower than the cost of executing queries,
then an RDSMAwriteset cluster scales
better than a single database. The greater this cost difference, the greater
the scalability. We leverage this approach in our solution to gain scalability.
Unfortunately, a solution directly based on RDSMAwriteset
does not meet our data consistency requirement: the introduction of asymmetrical
inputs to the state machines results in a loss of 1-copy serializability:
for example, it is clear that the lost update anomaly [50] occurs should two databases
concurrently increment the same variable.
[49] presents an approach, which we refer to
as RDSMAcert, that submits each query to only
one originating database, defers the commit of updates generated by that query
until certification, collects all updates generated by that query into a
writeset, and broadcasts the writeset to a GCS. The GCS delivers the writeset
in total order to all nodes, and the writeset is tested against a certification
criteria. The certification is designed such that if a transaction fails
certification at one node, it fails certification at all nodes. If the
certification succeeds, the transaction is locally committed.
A variety of certification schemes have been proposed with varying
performance and consistency properties. The following scheme, which we use,
guarantees snapshot isolation: Two operations are defined to conflict if they
are issued by two different transactions and update the same data. A
transaction ti fails certification if any
concurrent tj has conflicting operations. Transaction
concurrency detection is achieved without message overhead by maintaining Lamport time stamps to verify precedence criteria [49].
[14] presents
a practical, performant, and scalable realization of
this approach by implementing the update deferral, total ordering, and transaction
certification in a customized database access library (JDBC). We call their
scheme RDSMAmiddle.
Although addressing all of our scalability and consistency requirements,
RDSMAmiddle fails to meet some of
our fault tolerance and transparency requirements: RDSMAmiddle
is intolerant of network partitions, requires complete re-initialization of
recovering nodes, and requires applications to use a customized data access
library.
For each query execution in RDSMAmiddle,
the custom data access library at the originating database collects the
writeset and uses a GCS to broadcast the writeset to all databases in the
cluster.
A replica that was part of the cluster previously but not currently
unavailable will not receive the writeset. To address this problem, RDSMAmiddle requires that each
recovering database is initialized with the complete contents of the database
on recovery.
An approach that enables recovering databases to be incrementally
initialized involves defining a persistent
membership that specifies all replicas in the cluster. Each replica (or
member) is persistently aware of all other replicas.
While a virtual synchrony GCS guarantees the delivery of messages in
total order with respect to a view (
the set of currently reachable members), a total persisted order message delivery service (TPOS) guarantees delivery of messages in total order to each
member in the persistent membership (often implemented using store and forward
semantics).
[17] presents a specification of the TPOS as
well as an efficient and fault tolerant implementation of TPOS (TPOSimp). Although [17] discusses TPOS only in
the context of making RDSMAsymmetrical tolerant
of network partitions, the TPOS facility is independently and generally applicable
to a wide variety of problems.
From an architectural point of view, we abstract TPOSimp
to a component with one input and one output API:
-
SendMessage(Message) and
-
OnRcvMessage(Message).
One must call SendMessage from within the
context of a transaction. TPOSimp guarantees
that on return from the transaction commit that the message is persisted
durably and that we are guaranteed, given eventual path connectivity [17], that
the message eventually reaches all members in total persisted order.
To extend RDSMAmiddle to
meet our fault tolerance requirements, we integrate TPOSimp
into a custom implementation of a database access library (e.g. JDBC). TPOSimp incorporates an embedded ACID database
system (we use PSEPro) and a GCS (we use Spread). We
replace the GCS broadcasts in RDSMAmiddle with
TPOS broadcasts and keep the certification logic unchanged.
In addition, we remove the fault recovery logic in RDSMAmiddle.
Since TPOS guarantees that all nodes receive all broadcasted writesets in total
persisted order when the node re-joins a quorum, RDSMAReliableMiddle
implements incremental recovery.
Our tests demonstrate that the additional overhead associated with
local transactional persistence is acceptable given the modern performance of
disks and a write ahead logging implementation.
The resulting replication system guarantees snapshot isolation, is performant, and tolerates faults. However, the need to use
a custom database client library makes this approach difficult to deploy in some
scenarios. We present RDSMlock to address this limitation.
For conciseness, we follow the presentation of [10,39]
and characterize our replication scheme by the invariants (or rules) maintained
when applications read, write, and commit on the originating node and when
the writeset is delivered.
The following four rules characterize Read-Once-Write-All [50] RDSMAcert-based implementations:
Read
Rule: A read operation ri[x] of a transaction Ti is executed
locally by acquiring a local read lock at the initiating site of transaction Ti. Ti remains blocked
until the local read lock of x is granted at the initiating database.
Write
Rule: A write operation wi[x] of a transaction Ti is executed by
performing the update at the initiating site. The update should be made
available to construct a writeset later.
Commit
Rule: When the initiating client decides to commit the transaction Ti, the commit is
blocked and the writeset is retrieved and broadcast using TPOS to all members Wi.
Reconciliation
Rule: When a site receives
In terms of how readily these rules are implemented by existing
implementations: the Read Rule is satisfied by any 2PL database; the Write Rule
can be implemented in several ways including installing triggers that capture
the change to a table or scanning the transaction log.
The Commit Rule, however, requires a delay of the client’s transaction commit
for the required and non-standard distributed total ordering and certification to
occur. We have not yet discovered a way of satisfying this requirement when client
libraries and databases cannot be customized.
We use a pessimistic concurrency scheme to construct a replication
scheme with rules that are more easily satisfied by standard database and
client library implementations.
We partition the data into a disjoint set of conflict classes C =
{C1,…,Cn}. The granularity of a class may
range from a database, a table, to a tuple. During design time, our system
derives the schematic structure of the replicated databases our system and defaults
to row-based granularity for relational databases and object-based granularity
for object-oriented databases.
To leverage these conflict classes, we build and integrate into the
system a fault tolerant distributed lock management service (FTDLM). The FTDLM
at the originating site is informed of all transaction begins, writes, and
commits. With this information, the FTDLM is able to guarantee classic two
phase locking semantics.
While two phase locking is usually associated with eager replication, we
introduce a way of using 2PL in lazy replication while tolerating process and
network failures.
The
Read Rule:
(a)
A read operation ri[x] of a transaction Ti is executed
locally by acquiring a local read lock at the initiating site of transaction Ti. Ti remains blocked
until the local read lock of x is granted at the initiating database.
This requirement is satisfied by many DBMS implementations.
The
Write Rule:
(a)
A write operation wi[x] of a transaction Ti on initiating
site I is executed by first acquiring a distributed
exclusive lock for x. If the lock is currently owned by another site, then wi[x] is delayed until ownership of the distributed exclusive
lock for x is granted to site I.
(b)
When this is successfully acquired, the local write lock for x must be acquired.
If this is currently held by another transaction, then wi[x] is delayed until this local lock is granted.
(c)
The write is then performed and the change is
captured in a change log.
In terms of implementation, Part (a) of this rule is not immediately
satisfied by some DBMS implementations. Satisfaction of this requirement may
nevertheless be achieved by incorporating a “lock detector” associated with the
database using available customization facilities exposed by DBMS
implementations (like update triggers). This lock detector can then use the
FTDLM to acquire the distributed exclusive lock before the DBMS performs the
write.
Part (b) of this Write Rule is immediately satisfied by many DBMS
implementations.
Part (c) of this Write Rule requires that a persistent record of wi[x],
L[wi[x]]
is atomically recorded, in order, along with wi[x] in such a way that
L[wi[x]]
is available for later read access as described in the Commit Rule. We discuss
two ways this can achieved by typical DBMS
implementations. One, many DBMS implementations use a log to maintain
durability and record L[wi[x]] with the required
atomicity. If this can be accessed as described in the Commit Rule, then this
requirement is satisfied. Secondly, we can use available customization facilities
provided by the DBMS implementation to capture each wi[x] into a transactionally persistent data structure (like a
table) for subsequent access. We call any of these possibilities change capture
mechanisms and the data structure that contains the captured changes the change
log.
The
Commit Rule:
(a)
When an initiating site I decides to commit a
transaction Ti, the database transaction is immediately committed.
(b)
After the commit succeeds, a transaction tk
is asynchronously begun and executes the following tasks: All changes
associated with transaction Ti are retrieved from the committed
change log to form a writeset. The writeset
preserves ordering. We call this process change
retrieval. Once the writeset is retrieved, the change set is decorated with
an identifier specifying the initiating site I. This decorated change set is
then given to TPOS, which uses a total order broadcast primitive to guarantee
delivery of the change set to all sites (including I). The change log is then modified
in such a way as to guarantee that changes that have been retrieved are not
retrieved again.
(c)
Upon the successful commit of tk, all distributed locks
associated with Ti are released. If messages need to be sent,
the message is sent over the same GCS totally ordered view as the writeset
broadcast.
Part (a) of this Commit Rule is immediately satisfied by most DBMS
implementations.
Part (b) of this Commit Rule is not immediately satisfied by most DBMS
implementations. However, it is straightforward to augment these implementations
to satisfy part (b). In the Replicator module described later in this document,
this is achieved by having an external process, woken by a database commit begin
a transaction and query the database to retrieve the changes from the change
log, form a change set and set the initiator identifier for the change set, and
give it to the TPOS.
Part (c) involves the release of the distributed locks acquired in the
course of executing transaction Ti. In the design of the
Replicator described later in this document, we discuss how this is achieved by
informing the FTDLM that the
transaction has committed via the TransactionCompleted notification. The FTDLM may, if there are other sites requesting the distributed lock, broadcast messages to
other sites using the total order broadcast primitive in order to transfer
ownership of the distributed lock. This also guarantees that the lock transfer
is delivered to all sites after the change set is delivered, a critical
invariant for maintaining data consistency.
The
Reconciliation Rule:
(a)
At each site, the TPOS delivers change sets and the GCS
delivers lock management messages.
(b)
If the delivered message to site R is a writeset,
then site R extracts from the change set the initiator site I. If I is different from R, R applies each change in the change set
to the database in order. If R is the same as I, then the message is dropped. If
the delivered message to site R is a lock ownership transfer, then this message
is processed by the lock management system. Change sets and lock transfers must
be processed in the order that they are delivered.
Augmenting typical DBMS implementations with this logic typically poses
few problems. We use the TPOSM as the
persistent total ordering mechanism and the FTDLM
to implement 2PL. They both use the same broadcast primitive for communication
with other sites. This ensures that the ordering between lock management and
change set delivery is preserved (part 4 of this rule). We incorporate these
elements in an external process that receives the delivered messages from the
broadcast primitive. In addition, we implement logic needed to reconcile the
received change sets to the database by using available database query
mechanisms.
Data locality and choice of replication scheme
To summarize our solution, our solution fundamentally uses the RDSMhybrid
protocol, which is a hybrid of two replication protocols: RDSMlock and RDSMAReliableMiddle. Both protocols use hybrid
propagation schemes that broadcast updates after the originating transaction
commits. RDSMlock
uses a pessimistic locking scheme whereas RDSMAReliableMiddle
uses an optimistic concurrency scheme.
mean by dach
transaction accesses a set of conflict classes. Distributed mutual exclusion
guarantees that each conflict class is associated with a single master node at
any moment. Each node uses either RDSMlock or RDSMAReliableMiddle. Therefore, each conflict
class is associated with either RDSMlock or RDSMAReliableMiddle.
The protocol observes that if all conflict classes accessed by a
transaction are associated to RDSMAReliableMiddle,
then commit of the transaction can be processed by RDSMAReliableMiddle.
Otherwise, the transfer of the conflict class to RDSMAReliableMiddle
is initiated and the in flight transaction is aborted. RDSMlock
works without modification.
We illustrate with an example. Suppose a transaction begins on an RDSMAReliableMiddle node and that
transaction updates object A. If the current ownership of A
is owned by an RDSMAReliableMiddle node
(not necessarily this one), then the transaction executes using the
certification algorithm. Otherwise, the local FTDLM requests the lock
and aborts this transaction.
5
Implementation architecture
In this section, we describe how we have built this functionality.
In our implementation, we encapsulate the majority of the logic in a
module called the Replicator Engine (or
just Replicator). Each instance of
the Replicator executes identical logic.
However, correct operation of the logic requires certain configuration
information unique to each instance: each Replicator
instance must be configured to be associated with its own database instance as well as its own unique identifier. A particular instance
of the Replicator configured in this
manner is termed a member.
Any member belongs to a coordination
group. The Replicator replicates
the data in the databases of all members that are in the same coordination
group.
In addition to the associated unique identifier and database, each
instance of the Replicator is
configured to locally store a persistent membership list containing the unique
identifiers of all of the members of the coordination group. The contents of
the membership lists associated with each member belonging to the same
coordination group are identical. We use a dynamic membership protocol to
consistently maintain this list, described later.
The basic Replicator Engine exposes a user callable interface via a
concise
This
call does not involve network operations and therefore executes efficiently.
This call is allowed to succeed even when the local member is not part of a
quorum. This logic is convenient in use cases where clients acquire exclusive
locks when updating data but not when reading data. In this case, this logic allows
clients to begin and commit read-only transactions even when not connected to
quorum.
The
Replicator Engine guarantees that the lock is exclusively acquired and the
local replica contains up to date data if this function returns successfully.
The following pseudocode specifies the precise behavior of this function. The logic is designed such that:
repeated acquisitions of the same lock do not require network messages;
transactions that update data when the local instance does not have quorum are
aborted; multiple transactions that require the same resource are allowed to
execute concurrently locally (with correctness guaranteed by the local database
lock management system); and fairness is guaranteed even when a greedy node
never lets go of the resource.
Performance is improved by
implementing a “quantum” whereby N transactions are allowed to execute for each
lock transfer.
If the function throws, the
user of this function should undo any actions (for example, call Rollback() on his database transaction).
Lock(TransactionID, Name):
if this server is not in quorum
throw
exception;
if this server owns Name and
there
are waiters for resource and
number
of txns > quantum then
release
Name
if this server does not own Name then
request
Name
block
until acquired
update transaction lists
Calling this function
notifies the Replicator Engine that it can release all locks acquired in the
transaction. As discussed
earlier, the Replicator Engine does not assist the user in executing or
persisting operations but merely implements the 2-phase locking semantic. In
other words, exclusive ownership of a resource is acquired and guaranteed to be
maintained from the time the call to Lock()
returns
to the time that TransactionCompleted() is called.
In order to understand the roles of the various components, consider
the replication of a single user transaction.
The user transaction informs the Replicator
of the conflict classes it wishes to update, updates those classes when
allowed to proceed, and commits. The Fault Tolerant Distributed Lock Manager (FTDLM)
processes the associated transaction notifications: begin, acquire lock, and
commit. The source of notifications can
be either the application directly, or some detector monitoring application
transactions. Once the user transaction
has committed, the FTDLM asks the Change
Capture Retriever (
Coordination between the TPOSM and the FTDLM is handled by the Replicator Coordinator State Machine (RCSM),
which integrates their operation to implement the Read, Write, Commit, and
Reconciliation Rules of the replication method.
Both the FTDLM and the TPOSM utilize the EVS transport, which provides
totally ordered FIFO delivery.
Therefore, causal order between lock request, change set propagation,
and lock release is preserved. The Router multiplexes inputs and outputs
between the FTDLM, the TPOSM, and the EVS transport, allowing the two
components to share access to the transport.
Figure X depicts
the events that the various state machines use to interact with each other to
implement the required behaviour of the Replicator.
The
The
The TPOSM in the Replicator Module is based on the method described in
[17,15]. The
salient feature of the method in [17] is that all updates are applied in the
same total persisted order everywhere; thus all replicas contain identical
data. In [17] updates are applied upon
receipt; local commits are delayed until updates are received. In our system, local commits occur prior to
propagating updates, but the application order of updates on conflict classes
are the same everywhere. We also implemented the dynamic membership method
found in [17].
Removal of historical data occurs in [17,15]
by adding information identifying the last update applied with every update
sent. When an update has been applied on
every member, it can be deleted. If a
quorum is long lasting and contains members which never send updates, no last
applied update information is sent, and historical data can accumulate without
bound. We force members which do not
have any updates to send an empty update occasionally so that historical data
can be removed.
The communication system cannot be considered an infinite sink and has
limited resources. Members that do not
retrieve messages from the communication system quickly enough appear
unresponsive and are dropped from the view, thereby forcing a
SendMessage is used by the
RCSM to send a change set to deliver to all other RCSM instances in the
coordination group. The message must be delivered guaranteeing total persisted
order. The call is made in the context of a transaction and is made in the same
transaction as the change retrieval executed by the
OnMessage notifies the RCSM of the next action in total
persisted order to reconcile. The
message is delivered in the context of a transaction and provides both the
action itself and the relative total order of the action.
The RCSM coordinates among the
A
member M acquires locks on conflict classes C = {C1,…,Cn}
on behalf of transactions = {T1,…,Tn}. A transaction may use several conflict
classes. Each member executes a replicated state machine [8] containing the
global lock state. In the absence of
process and network failures, the lock state is identical at each member. After process or network failures members
with stale or missing lock state obtain up to date lock state from surviving
members.
Mutual exclusion is
preserved through a replicated queue of lock waiters: the head of the queue is
the current owner and the queue changes
via receipt of lock request and lock release messages. Members do not send lock release messages
until applications have finished updating associated conflict classes [1]. Locks are initially acquired according to
total reception order of lock request messages.
Multiple resource mutual
exclusion is enforced by the two phase locking protocol. A member M serially acquires all the locks
needed by an application A prior to allowing the A to
proceed. Locks are acquired utilizing
the single resource mutual exclusion protocol.
Once A is finished, the locks are released. [2]
Without network and process
failures the per server lock state is always up to date; therefore it is always
correct and mutual exclusion is always preserved [8]. Network and process failures create stale or
missing lock state which can lead to concurrent modification in violation of
mutual exclusion.
The mutual exclusion methods
outlined in 4.2 and 4.3 are vulnerable to network partitions. Unowned locks may
be concurrently acquired in disconnected network partitions. Owned locks are safe because only the owner
can release them. Therefore, the process
of granting initial locks must employ mutual exclusion among partitions. Majority weighting is used to choose one
component to be the quorum component [3,4].
Utilization of quorum
components is not sufficient to avoid mutual exclusion violation in the
presence of process failures. In
addition, the lock state in the quorum component must always reflect the latest
lock state; otherwise locks held by crashed servers can be acquired. Therefore, quorums must be intersecting [5]
and lock state from the previous quorum must be present.
The FTDLM is
composed of three component state machines: the Quorum State Machine (QSM), the
Lock Manager State Machine (LMSM), and the Lock Queue State Manager
(LQSM). The LQSM manages lock state for
a single conflict class: thus there are as many LQSM instances as conflict
classes. When not in quorum, the LMSM
causes all new transactions and lock requests to abort. Determination of quorum and maintenance of
the latest lock state during process and network failures is handled by the
QSM.
The LQSM contains
a queue of member unique identifiers where the head of the queue is the current
owner; other members are FIFO ordered waiters for a lock on the conflict class
[1]. The existence of process failures
requires durable storage of current lock ownership so that upon recovery the
latest lock ownership is available.
The LQSM receives lock
requests and transaction commit notifications from the
LMSM via the OnLock method and OnTransactionCompleted. Lock request and release messages are
received via OnRcvEnqueue and OnRcvDequeue. After a process or network failure, the QSM
notifies each LQSM of the latest lock state by calling OnReset.
OnLock:
if this member is not queue then
SendEnqueue
wait for this member to reach head of queue
update transaction lists
OnTransactionCompleted:
update transaction lists
if no transaction is using this lock then
SendDequeue
OnRcvEnqueue:
add member to queue
if the head of the queue changed then
persist the new owner
if the head of the queue is this member then
signal waiting transactions
OnRcvDequeue:
remove member from queue
if the head of the queue changed then
persist the new owner
if this member is now the head of the queue
signal waiting transactions
The QSM is responsible for ensuring that each quorum starts with the
latest lock state. The QSM is also
responsible for preventing quorum when not enough members are present.
The EVS transport informs the QSM of view changes by calling OnRcvRegConf and OnRcvTransConf. This causes the QSM to first call OnEnteringNonPrimary and then send lock state to members of
the view by calling SendLockMgrState. The lock state consists of the members, the
version number, and the lock queues at the end of the last quorum the member
has knowledge of. Recovering members
have no transient state and only send persistent lock and quorum state. The QSM receives lock state message from all
members of the view via OnRcvLockMgrState. With the lock state messages from all
members, the QSM can determine the latest quorum and lock state, which it then
persists in order to propagate latest quorum and lock state information via
eventual path [17]. With the latest quorum the QSM determines whether a new
quorum can be formed. If so, the QSM
informs each LQSM of the current latest lock state by calling OnReset, and
calls OnEnteringPrimary.
Quorum is attained when a weighted majority of the members in the
latest quorum are present in the current
The simple implementation suffers from poor performance due to
excessive access of durable storage, unnecessary multiple lock requests, and
starvation. Because lock ownership is
persisted upon every ownership change, potentially every transaction initiated
at the local member will one or more durable storage requests. Starvation can exist when several
applications utilizing a single member continually use the same conflict
class. In the presence of significant
contention the overhead of lock transferral can become significant.
Unless all members of a view fail simultaneously every quorum is
survived by at least one member. Because
all members of the quorum received the same lock messages, they all contain the
same lock queue state. Instead of persisting every change in lock ownership, the ownership at
the end of the quorum is persisted by surviving members, as it is equivalent to
persisting the lock ownership upon every change. This durable lock ownership is called the lock slice, and includes the quorum
version number from which the lock ownership was determined: this is sufficient
to determine the latest persistent lock state.
When all members fail simultaneously, there are no surviving members to
persist the lock slice. Therefore, no
latest lock ownership is available upon recovery. However, some stale version of the lock slice may be available, and using
this version violates the locking invariants
In order to detect this case, the QSM compares the latest quorum version
of the lock state (transient or persistent) among the available members with
that of the last quorum. If the lock
state version does not match the quorum version, then the QSM knows no member
present in the current view survived the ending of the last quorum, and that no
suitable lock slice is available to initialize lock state. The QSM must force all members of the last
quorum to be present before forming a new quorum in order to ensure that any unpropagated changes can be discovered.
Data access patterns in many applications demonstrate a locality of
data access: data accessed once is likely to be accessed again. Rather than releasing locks as soon as
possible, lock acquisition overhead is reduced by only releasing locks upon
commit if there is a waiter for the conflict class; otherwise the lock is retained
so that further updates on the conflict class can proceed without blocking.
Locks are owned by members and used by transactions. A lock that is
owned by a member may be simultaneously used by several transactions on that
member.
Greedy members can cause unfair behaviour, where locks are never
released because some local transaction is always using the lock. In order to introduce fairness, a limit on
the number of transactions that may run at a member is introduced, called the quantum.
If there are waiters for the conflict class when OnTransactionCompleted is
executed, and there are local transactions utilizing the conflict class waiting
to execute, then the LQSM only allows a quantum of local transactions to
execute using the current lock ownership.
Local transactions above the quantum must wait until the lock is
released and reacquired. This allows
other members to update the conflict class.
6
Conclusion
We have presented the architecture and design of a fully implemented
replication system that exploits the scalability advantages of a hybrid
propagation scheme while meeting transparency and fault tolerance requirements.
7
References
[1]
L. Lamport. Time, clocks
and the ordering of events in a distributed system, Comm
ACM 21, 7, July 1978, 558-565.
[2]
J. Gray. Notes
on Database Systems.
[3]
D. K. Gifford. Weighted voting for replicated
data. In Proceedings of the Seventh ACM
Symposium on Operating Systems Principles (Dec 1979).
[4]
S. Jajodia, D. Mutchler. Dynamic
voting algorithms for maintaining the consistency of a replicated
database. ACM Transactions on Database
Systems, 15(2):230-280, 1990.
[5]
D. Agrawal, A. El Abbadi. An efficient and Fault-Tolerant Solution for
Distributed Mutual Exclusion. ACM
Transactions on Computer Systems 9(1):1-20,1991.
[6]
N. Kronenberg, H. Levy, W. Strecker. VAXClusters: A
Closely-Coupled Distributed System. In ACM TOCS, May 1986.
[7]
M. Carey, M. Livny.
Distributed Concurrency Control Performance: A Study of Algorithms,
Distribution, and Replication. In VLDB 1988.
[8]
F. Schneider. Implementing fault-tolerant services
using the state machine approach: a tutorial. In ACM Computing Surveys,
December 1990.
[9]
M. Velazquez. A Survey of Distributed Mutual
Exclusion Algorithms. In
[10]
I. Stanoi, D. Agrawal, A. Abbadi. Using
Broadcast Primitives in Replicated Databases. In ICDCS 1998.
[11]
M. Wiessman, F. Pedone, A. Schiper, B. Kemme, G. Alonso. Understanding Replication in Databases
and Distributed Systems. In ICDCS 2000.
[12]
X. Defago, A. Schiper, P. Urban. Total Order Broadcast and Multicast
Algorithms: Taxonomy and Survey. In ACM Computing Surveys, December 2004.
[13]
S. Lin, Q. Lian,
M. Chen, Z. Zhang. A Practical distributed mutual exclusion protocol in dynamic
peer-to-peer systems. In IPTPS 2004.
[14]
Y. Lin, B. Kemme,
M. Patino-Martinez, R. Jimenez-Peris. Middleware based
data replication providing snapshot isolation. In SIGMOD 2005.
[15]
O. Amir, Y. Amir, D. Dolev. A Highly Available
Application in the Transis Environment. 1993.
[16]
A. Adya, B. Liskov, and P. E. O'Neil. Generalized isolation level
definitions. In ICDE, 2000.
[17]
Y. Amir and C. Tutu. From Total Order to Database
Replication. In Proc. of Int. Conf. on Distr. Comp. Systems (ICDCS), July 2002.
[18]
C. Amza, A. L. Cox, and W. Zwaenepoel. Consistent replication for scaling back-end
databases of dynamic content web sites. In Middleware, 2003.
[19]
T. Anderson, Y. Breitbart,
H. F. Korth, and A. Wool. Replication, consistency,
and practicality: Are these mutually exclusive? In ACM SIGMOD, 1998.
[20]
H. Berenson, P. Bernstein, J. Gray, J. Melton, E.
O'Neil, and P. O'Neil. A critique of
[21]
K.P. Birman. Building
Secure and Reliable Network Applications. Prentice Hall, 1996.
[22]
K. Bohm, T. Grabs, U. Rohm,
and H.-J. Schek.
Evaluating the coordination overhead of synchronous replica maintenance.
In Euro-Par, 2000.
[23]
Y. Breitbart, R. Komondoor, R. Rastogi, S. Seshadri, and A. Silberschatz.
Update propagation protocols for replicated databases. In ACM SIGMOD, 1999.
[24]
E. Cecchet, J. Marguerite,
and W. Zwaenepoel.
C-JDBC: Flexible database clustering middleware. In USENIX Conference,
2004.
[25]
P. Chundi, D. J. Rosenkratz, and S. S. Ravi. Deferred updates and data
placement in distributed databases. In
ICDE, 1996.
[26]
Transaction Processing Performance Council.
[27]
K. Daudjee and K. Salem.
Lazy database replication with ordering guarantees. In ICDE, 2004.
[28]
S. Elnikety, F. Pedone, and W. Zwaenepoel. Generalized snapshot isolation and a
prefix-consistent implementation. Technical report, EPFL, 2004.
[29]
A. Fekete, D. Liarokapis, E. O'Neil, P. O'Neil, and D. Shasha. Making snapshot isolation serializable.
[30]
Foedero Technologies Inc.
Foederoreplica 1.0, 2004.
http://www.foedero.com/FoederoReplica.html.
[31]
J. Gray, P. Helland, P.
O'Neil, and D. Shasha. The dangers of replication and
a solution. In ACM SIGMOD, 1996.
[32]
J. Holliday, D. Agrawal,
and A. El Abbadi. The performance of database
replication with group communication. In IEEE FTCS, 1999.
[33]
J. Holliday, R. Steinke, D. Agrawal,
and A. Abbadi. Epidemic algorithms for replicated
databases. TKDE, 15(5), 2003.
[34]
R. Jimenez-Peris, M. Patino-Martinez, B. Kemme, and G.
Alonso. Improving the scalability of fault-tolerant database clusters. In
ICDCS, 2002.
[35]
B. Kemme and G. Alonso. A
new approach to developing and implementing eager database replication
protocols. ACM TODS, 25(3), 2000.
[36]
C. Liu, B. G. Lindsay,
[37]
E. Pacitti, P. Minet, and E. Simon. Replica consistency in lazy master
replicated databases. Distributed and
Parallel Databases, 9(3), 2001.
[38]
E. Pacitti and E. Simon.
Update propagation strategies to improve freshness in lazy master replicated
databases. VLDB Journal, 8(3), 2000.
[39]
F. Pedone, R. Guerraoui, and A. Schiper.
Exploiting Atomic Broadcast in Replicated DBs. In
EuroPar'98.
[40]
K. Petersen, M. Spreitzer,
D. B. Terry, M. Theimer, and A. J. Demers. Flexible
update propagation for weakly consistent replication. In SOSP, 1997.
[41]
C. Plattner and G. Alonso. Ganymed: Scalable replication for transactional web
applications. In Middleware, 2004.
[42]
M. Rabinovich, N. H. Gehani, and A. Kononov. Scalable update propagation in epidemic
replicated databases. In EDBT, 1996.
[43]
U. Rohm, K. Bohm, H-J. Schek,
and H. Schuldt. FAS - a freshness-sensitive
coordination middleware for a cluster of olap
components. In VLDB, 2002.
[44]
R. Schenkel and G. Weikum. Integrating snapshot isolation into transactional
federations. In CooPIS'00.
[45]
R. Schenkel, G. Weikum, N. Weissenberg, and X.
Wu. Federated transaction management with snapshot isolation. In FMLDO, 1999.
[46]
Spread. Homepage: http://www.spread.org/.
[47]
J. Wieck. A replication
system for PostgreSQL. White Paper: http://gborg.postgresql.org/
[48]
S. Wu and B. Kemme. Postgres-R(SI): Combining replica
control with concurrency control based on snapshot isolation. In ICDE, 2005.
[49]
V. Zuikeviciute and F. Pedone: Revisiting the
[50]
P. Bernstein, V. Hadzilacos,
N. Goodman. Concurrency and Recovery in Database Systems. In Addison-Wesley
book, 1987.
[51]
PSEPro. Homepage: http://www.progress.com/realtime/.