(deprecated) RSpace 0.2 Specification
Use Cases
History & Rollback
- Clients should be able to save the current state of RSpace in the form of a
Checkpoint
(root hash of a given Merkle Patricia Trie) - Clients should be able to
rollback
the state of RSpace to a givenCheckpoint
. - Possibly out of scope: Provide a mechanism for clients to have access to the state (leaves) at a a given
checkpoint
. This is needed in order to allow newvalidators
to join an existingnetwork.
COMM Event Traces
- Clients should be able to acquire a
log
of allproduces
,consumes
, and COMM Events ("matches") that have occurred in RSpace since the last call togetCheckpoint
. - Given a log of COMM Events, we need to be able to run RSpace in a "rigged mode" where a new
produce
orconsume
that matches the one in thelog
will cause RSpace to return the same result
Storage Layer
The storage layer will be used as library code in an existing Scala codebase.
As envisioned, the storage layer covers the following use cases:
- It should allow the Rholang interpreter to persist and retrieve objects related to it's execution state. It WILL NOT execute those objects.
- It should be general enough for use as a storage layer outside of the context of the Rholang interpreter. This will be delivered as a self-contained Scala library, targeting Scala 2.11.x, 2.12.x, and eventually 2.13.x
- It should allow the client (RChain blockchain) to
checkpoint
andreset
RSpace.
Design Considerations
Interfaces
System Interface
Overview
RSpace implements the following interface which is the minimal contract required by RChain blockchain for the storage layer.
trait ISpace[C, P, A, K] { def consume(channels: Seq[C], patterns: Seq[P], continuation: K, persist: Boolean)( implicit m: Match[P, A]): Option[(K, Seq[A])] def produce(channel: C, data: A, persist: Boolean)(implicit m: Match[P, A]): Option[(K, Seq[A])] def install(channels: Seq[C], patterns: Seq[P], continuation: K)( implicit m: Match[P, A]): Option[(K, Seq[A])] def createCheckpoint(): Checkpoint def reset(root: Blake2b256Hash): Unit def close(): Unit }
class RSpace[C, P, A, K](val store: IStore[C, P, A, K], val branch: Branch)( implicit serializeC: Serialize[C], serializeP: Serialize[P], serializeA: Serialize[A], serializeK: Serialize[K] ) extends ISpace[C, P, A, K]
Prerequisites
The RSpace implementation depends on a few external pieces of information and/or dependencies bundled in Context
.
class Context[C, P, A, K] private ( val env: Env[ByteBuffer], val path: Path, val trieStore: ITrieStore[Txn[ByteBuffer], Blake2b256Hash, GNAT[C, P, A, K]] )
Env
is the configuration expected by LMDB (https://github.com/lmdbjava/lmdbjava/blob/master/src/main/java/org/lmdbjava/Env.java).
Path
points to where the data will be held.
ITrieStore
is the mechanism that builds history that can be checkpoint
-ed.
Additionally a default Branch
is required.
ITrieStore
An interface representing the basic actions needed to build and maintain a Merkle Patricia Trie.
An implementation named LMDBTrieStore is backed with LMDB.
It is used as the source of the current tip of the history of RSpace.
LMDBStore
All the low-level calls that result in consume or produce
are defined in terms of an IStore interface.
The implementation delivered Storage.02 is based on LMDB.
class LMDBStore[C, P, A, K]
RSpace creation
object RSpace { def create[C, P, A, K](context: Context[C, P, A, K], branch: Branch)( implicit sc: Serialize[C], sp: Serialize[P], sa: Serialize[A], sk: Serialize[K]): RSpace[C, P, A, K] = { implicit val codecC: Codec[C] = sc.toCodec implicit val codecP: Codec[P] = sp.toCodec implicit val codecA: Codec[A] = sa.toCodec implicit val codecK: Codec[K] = sk.toCodec history.initialize(context.trieStore, branch) val mainStore = LMDBStore.create[C, P, A, K](context, branch) new RSpace[C, P, A, K](mainStore, branch) } }
Consume
/** Searches the store for data matching all the given patterns at the given channels. * * If no match is found, then the continuation and patterns are put in the store at the given * channels. * * If a match is found, then the continuation is returned along with the matching data. * * Matching data stored with the `persist` flag set to `true` will not be removed when it is * retrieved. See below for more information about using the `persist` flag. * * '''NOTE''': * * A call to [[consume]] that is made with the persist flag set to `true` only persists when * there is no matching data. * * This means that in order to make a continuation "stick" in the store, the user will have to * continue to call [[consume]] until a `None` is received. * * @param channels A Seq of channels on which to search for matching data * @param patterns A Seq of patterns with which to search for matching data * @param continuation A continuation * @param persist Whether or not to attempt to persist the data */ def consume(channels: Seq[C], patterns: Seq[P], continuation: K, persist: Boolean)( implicit m: Match[P, A]): Option[(K, Seq[A])]
Match
/** * @tparam P A type representing patterns * @tparam A A type representing data */ trait Match[P, A] { def get(p: P, a: A): Option[A] }
The Match
typeclass allows a consume
to be agnostic about the meaning of a match. As long as an instance is available, consume
can try to match
an incoming continuation
with data
existing in RSpace.
Install
def install(channels: Seq[C], patterns: Seq[P], continuation: K)( implicit m: Match[P, A]): Option[(K, Seq[A])]
Install
behaves similarly to consume
except that if the incoming continuation
does not find a match,
any existing waiting continuation
at channels
will be replaced by the incoming one and it will be a persistent
one.
Produce
/** Searches the store for a continuation that has patterns that match the given data at the * given channel. * * If no match is found, then the data is put in the store at the given channel. * * If a match is found, then the continuation is returned along with the matching data. * * Matching data or continuations stored with the `persist` flag set to `true` will not be * removed when they are retrieved. See below for more information about using the `persist` * flag. * * '''NOTE''': * * A call to [[produce]] that is made with the persist flag set to `true` only persists when * there are no matching continuations. * * This means that in order to make a piece of data "stick" in the store, the user will have to * continue to call [[produce]] until a `None` is received. * * @param channel A channel on which to search for matching continuations and/or store data * @param data A piece of data * @param persist Whether or not to attempt to persist the data */ def produce(channel: C, data: A, persist: Boolean)(implicit m: Match[P, A]): Option[(K, Seq[A])]
Rollback
case class Checkpoint(root: Blake2b256Hash, log: trace.Log) /** Creates a checkpoint. * * @return A [[Checkpoint]] */ def createCheckpoint(): Checkpoint /** Resets the store to the given root. * * @param root A BLAKE2b256 Hash representing the checkpoint */ def reset(root: Blake2b256Hash): Unit
The usage of the checkpoint mechanism is straight-forward:
//given a space: val space: ISpace[C, P, A, K] = RSpace.create(???) val checkpoint0 = space.getCheckpoint() space.store.isEmpty // => res0: Boolean = true space.consume(???) space.store.isEmpty // => res1: Boolean = false val checkpoint1 = space.getCheckpoint() space.reset(checkpoint0.root) space.store.isEmpty // => res2: Boolean = true space.reset(checkpoint1.root) space.store.isEmpty // => res3: Boolean = false
Replay mode
There exists a deterministic counterpart of the non-deterministic RSpace - the ReplayRSpace.
Before executing consume and produce operations, it needs to be `rigged` using a checkpoint hash and a log of operations recorded previously by a non-deterministic RSpace.
class ReplayRSpace[C, P, A, K](val store: IStore[C, P, A, K], val branch: Branch)( implicit serializeC: Serialize[C], serializeP: Serialize[P], serializeA: Serialize[A], serializeK: Serialize[K] ) extends ISpace[C, P, A, K] {
How trace log is recorded
The trace log is a synchronized sequence of events embedded in RSpace.
sealed trait Event case class COMM(consume: Consume, produces: Seq[Produce]) extends Event sealed trait IOEvent extends Event class Produce private (val hash: Blake2b256Hash) extends IOEvent { ... } class Consume private (val hash: Blake2b256Hash) extends IOEvent { ... } type Log = immutable.Seq[Event] private val eventLog: SyncVar[Log]
Trace logs are recorded during usage of consume, produce, and install operations executed on the normal (non-deterministic) RSpace instance.
// consume val ref = Consume.create(channels, patterns, continuation, persist) // install val ref = Consume.create(channels, patterns, continuation, true) // produce val ref = Produce.create(channel, data, persist) eventLog.update(ref +: _)
Every time a match is found, either during consume, produce, or install operations, an additional COMM event is written to the trace log.
eventLog.update(COMM(consumeRef, dataCandidates.map(_.datum.source)) +: _)
During checkpoint creation the contents of the eventLog are stored in the `Checkpoint` object for future reference.
val rigPoint = space.createCheckpoint()
After this is done the eventLog is cleared.
How the log is used to create replay data
The checkpoint data can be used to rig a ReplayRSpace. This operation initalizes the replay data used for deterministic replay of events.
def rig(startRoot: Blake2b256Hash, log: trace.Log): Unit type MultisetMultiMap[K, V] = mutable.HashMap[K, com.google.common.collect.Multiset[V]] type ReplayData = MultisetMultiMap[IOEvent, COMM] private val replayData: SyncVar[ReplayData] replaySpace.rig(emptyPoint.root, rigPoint.log)
How deterministic playback works
The consume and produce operations of ReplayRSpace behave similarly to those of the normal RSpace. The main difference is that whenever a COMM event occurs, the replayData is used to check whether it occurred during normal operation of the RSpace. If not, an exception is thrown. If yes, it is removed from the replayData so it cannot occur again.
This implies, that in replay mode exactly the same COMM events as in the log must occur and no others.
val resultConsume = space.consume(channels, patterns, continuation, false) val resultProduce = space.produce(channels(0), datum, false) val replayResultConsume = replaySpace.consume(channels, patterns, continuation, false) val replayResultProduce = replaySpace.produce(channels(0), datum, false) replayResultProduce shouldBe resultProduce
Hardware Interface
Describe how the software will interface with physical systems, ex: ports for communication, which devices are to be supported and any hardware protocols used.
Storage.02 depends on the availability of a writeable Path
that the LMDB can access with write privileges.
Software Interface
Will software be used in creating the system? If so, indicate what software (name and version), how it is to be used, and any details about how it interfaces with the software being specified.
User Interface
What is the user interface for the software? How will users interact with the software? List out all the aspects involved in making the UI better for all users that will interact or support the software.
Communications Interface
System Overview
Provide a description of the software system, including its functionality and matters relating to the overall system and design. Feel free to split this up into subsections, or use data flow diagrams or process flow diagrams.
ISpace (RSpace)
ISpace is an abstraction that allows a client to perform high-level actions (produce & consume) on an underlying tuplespace implementation.
The only implementation available is RSpace. It's core characteristics are:
- It holds an LMDBStore instance (LMDB backed IStore implementation)
- It holds an LMDBTrieStore instance (LMDB backed ITrieStore implementation)
- It holds a Log instance
- It requires four
serializers
to be in scopeSerialize[C]
for channelsSerialize[P]
for patternsSerialize[A]
for dataSerialize[K]
for continuations
IStore (LMDBStore)
The LMDBStore sits directly on the LMDB database. It introduces low level entities:
Join
Channel
Waiting Continuations
Data
and according CRUD actions for each.
Internally, entities are bundled in GNATs.
/** [[GNAT]] is not a `Tuple3` */ final case class GNAT[C, P, A, K]( channels: Seq[C], data: Seq[Datum[A]], wks: Seq[WaitingContinuation[P, K]] // waiting continuations )
Each action requires a transaction to be open.
private[rspace] def createTxnRead(): T = env.txnRead private[rspace] def createTxnWrite(): T = env.txnWrite private[rspace] def withTxn[R](txn: T)(f: T => R): R
Additionally LMDBStore
allows checkpoint creation.
def createCheckpoint(): Blake2b256Hash
This is required to allow #rollback.
The StoreEventsCounter gathers m
etrics.
final class Counter { private[this] val sumCounter: AtomicReference[SumCounter] = new AtomicReference[SumCounter](SumCounter(0, 0, 0)) private[rspace] class StoreEventsCounter val producesCounter val consumesCounter val consumesCommCounter val producesCommCounter val installCommCounter case class StoreCount(count: Long, avgMilliseconds: Double, peakRate: Int, currentRate: Int) case class StoreCounters(sizeOnDisk: Long, dataEntries: Long, consumesCount: StoreCount, producesCount: StoreCount, consumesCommCount: StoreCount, producesCommCount: StoreCount, installCommCount: StoreCount)
The LMDBStore depends on Codecs for base entities. These can be derived from Serialize
.
implicit val codecC: Codec[C] = sc.toCodec implicit val codecP: Codec[P] = sp.toCodec implicit val codecA: Codec[A] = sa.toCodec implicit val codecK: Codec[K] = sk.toCodec
On the lowest level the LMDBStore relies on the available codecs to encode/decode data.
The underlying LMDB instance is divided into:
_dbGNATs: Dbi[ByteBuffer], _dbJoins: Dbi[ByteBuffer],
Additional tracking structures:
Seq[TrieUpdate[C, P, A, K]]
This structure is used as part of the checkpoint & rollback mechanism.
ITrieStore[Txn[ByteBuffer], Blake2b256Hash, GNAT[C, P, A, K]]
The LMDBStore holds an instance of LMDBTrieStore. This is used as the store to keep checkpoints.
ITrieStore (LMDBTrieStore)
The available implementation is based on Merkle Patricia Trie.
The underlying store is split into:
_dbTrie: Dbi[ByteBuffer], _dbRoot: Dbi[ByteBuffer]
The root db holds the current root, and tries holds all the pieces of data.
This implementation uses the concept of Skip (Extension) Nodes.
Limitations
Assumptions and Dependencies
Software Dependencies
Lightning Memory-mapped Database
"An ultra-fast, ultra-compact, crash-proof key-value embedded data store."
General Information:
http://www.lmdb.tech/doc/index.html (Doxygen documentation for C version)
JNR-FFI Wrapper Library for use from Scala
https://github.com/lmdbjava/lmdbjava
http://www.javadoc.io/doc/org.lmdbjava/lmdbjava/0.6.0 (javadoc)
Protocol Buffers (serialization format)
"...are a language-neutral, platform-neutral extensible mechanism for serializing structured data."
Both Rholang objects and Blockchain blocks will be serialized to and deserialized from binary using the Protocol Buffer serialization format.
The serialized data will be stored in LMDB as the value portion of the key-value pair.
https://developers.google.com/protocol-buffers/
ScalaPB
"...is a protocol buffer compiler plugin for Scala. It will generate Scala case classes, parsers and serializers for your protocol buffers."
Metrics
We use Kamon as our monitoring toolkit. Below is the overview of metrics published by RSpace:
* COMM event counters distinguished by the operation which triggered the event:
private[this] val consumeCommCounter = Kamon.counter("rspace.comm.consume") private[this] val produceCommCounter = Kamon.counter("rspace.comm.produce")
* Spans tracing execution times of all operations:
private[this] val consumeSpan = Kamon.buildSpan("rspace.consume") private[this] val produceSpan = Kamon.buildSpan("rspace.produce") protected[this] val installSpan = Kamon.buildSpan("rspace.install")
Analogous metrics exist for the ReplayRSpace.
These metrics are published by Kamon in a Prometheus format so they can be scraped by the latter. Prometheus is configured as the data source in Grafana, allowing us to query the time series using the built-in expression language and visualize them.
To find the number of COMM events per second triggered by produce operations use: rate(rspace_comm_produce_total[1m])
"rspace_comm_produce_total[1m]" - converts the time series to a range vector containing data points gathered over the last minute
The "rate" function calculates the per second average rate of increase of the time series in the range vector
Number of COMM events per second triggered by consume operations is defined analogously.
Architectural Strategies
Describe any design decisions and/or strategies that affect the overall organization of the system and its higher-level structures. These strategies should provide insight into the key abstractions and mechanisms used in the system architecture. Describe the reasoning employed for each decision and/or strategy (possibly referring to previously stated design goals and principles) and how any design goals or priorities were balanced or traded-off. Such decisions might concern (but are not limited to) things like the following:
- Use of a particular type of product (programming language, database, library, etc. ...)
- Reuse of existing software components to implement various parts/features of the system
- Future plans for extending or enhancing the software
- User interface paradigms (or system input and output models)
- Hardware and/or software interface paradigms
- Error detection and recovery
- Memory management policies
- External databases and/or data storage management and persistence
- Distributed data or control over a network
- Generalized approaches to control
- Concurrency and synchronization
- Communication mechanisms
- Management of other resources
System Architecture
Not Use Cases:
- A node needs to store the contents of a proposed block.
- If the node's block proposal is not accepted, and the node isn't slashed for bad behavior, then the node will have to come to consensus on another block proposal. When the node accepts the block, the node has to update it's state to match that of the block it just accepted, and any transactions that it had stored as part of a block proposal that are NOT part of the new block will need to be kept (assuming that these transactions didn't come from a slashed validator) as part of the next proposed block.
- The node's state is essentially the tuplespace, in that all continuations are stored in the tuplespace.
- In the event a validator is slashed, what happens to the transactions they have processed? Do these need to be rolled back?