Checkpoint
(root hash of a given Merkle Patricia Trie)rollback
the state of RSpace to a given Checkpoint
.checkpoint
. This is needed in order to allow new validators
to join an existing network.
log
of all produces
, consumes
, and COMM Events ("matches") that have occurred in RSpace since the last call to getCheckpoint
.produce
or consume
that matches the one in the log
will cause RSpace to return the same resultThe storage layer will be used as library code in an existing Scala codebase.
As envisioned, the storage layer covers the following use cases:
checkpoint
and reset
RSpace.Design Considerations
RSpace implements the following interface which is the minimal contract required by RChain blockchain for the storage layer.
trait ISpace[C, P, A, K] { def consume(channels: Seq[C], patterns: Seq[P], continuation: K, persist: Boolean)( implicit m: Match[P, A]): Option[(K, Seq[A])] def produce(channel: C, data: A, persist: Boolean)(implicit m: Match[P, A]): Option[(K, Seq[A])] def install(channels: Seq[C], patterns: Seq[P], continuation: K)( implicit m: Match[P, A]): Option[(K, Seq[A])] def createCheckpoint(): Checkpoint def reset(root: Blake2b256Hash): Unit def close(): Unit } |
class RSpace[C, P, A, K](val store: IStore[C, P, A, K], val branch: Branch)( implicit serializeC: Serialize[C], serializeP: Serialize[P], serializeA: Serialize[A], serializeK: Serialize[K] ) extends ISpace[C, P, A, K] |
The RSpace implementation depends on a few external pieces of information and/or dependencies bundled in Context
.
class Context[C, P, A, K] private ( val env: Env[ByteBuffer], val path: Path, val trieStore: ITrieStore[Txn[ByteBuffer], Blake2b256Hash, GNAT[C, P, A, K]] ) |
Env
is the configuration expected by LMDB (https://github.com/lmdbjava/lmdbjava/blob/master/src/main/java/org/lmdbjava/Env.java).
Path
points to where the data will be held.
ITrieStore
is the mechanism that builds history that can be checkpoint
-ed.
Additionally a default Branch
is required.
An interface representing the basic actions needed to build and maintain a Merkle Patricia Trie.
An implementation named LMDBTrieStore is backed with LMDB.
It is used as the source of the current tip of the history of RSpace.
All the low-level calls that result in consume or produce
are defined in terms of an IStore interface.
The implementation delivered Storage.02 is based on LMDB.
class LMDBStore[C, P, A, K] |
object RSpace { def create[C, P, A, K](context: Context[C, P, A, K], branch: Branch)( implicit sc: Serialize[C], sp: Serialize[P], sa: Serialize[A], sk: Serialize[K]): RSpace[C, P, A, K] = { implicit val codecC: Codec[C] = sc.toCodec implicit val codecP: Codec[P] = sp.toCodec implicit val codecA: Codec[A] = sa.toCodec implicit val codecK: Codec[K] = sk.toCodec history.initialize(context.trieStore, branch) val mainStore = LMDBStore.create[C, P, A, K](context, branch) new RSpace[C, P, A, K](mainStore, branch) } } |
/** Searches the store for data matching all the given patterns at the given channels. * * If no match is found, then the continuation and patterns are put in the store at the given * channels. * * If a match is found, then the continuation is returned along with the matching data. * * Matching data stored with the `persist` flag set to `true` will not be removed when it is * retrieved. See below for more information about using the `persist` flag. * * '''NOTE''': * * A call to [[consume]] that is made with the persist flag set to `true` only persists when * there is no matching data. * * This means that in order to make a continuation "stick" in the store, the user will have to * continue to call [[consume]] until a `None` is received. * * @param channels A Seq of channels on which to search for matching data * @param patterns A Seq of patterns with which to search for matching data * @param continuation A continuation * @param persist Whether or not to attempt to persist the data */ def consume(channels: Seq[C], patterns: Seq[P], continuation: K, persist: Boolean)( implicit m: Match[P, A]): Option[(K, Seq[A])] |
/** * @tparam P A type representing patterns * @tparam A A type representing data */ trait Match[P, A] { def get(p: P, a: A): Option[A] } |
The Match
typeclass allows a consume
to be agnostic about the meaning of a match. As long as an instance is available, consume
can try to match
an incoming continuation
with data
existing in RSpace.
def install(channels: Seq[C], patterns: Seq[P], continuation: K)( implicit m: Match[P, A]): Option[(K, Seq[A])] |
Install
behaves similarly to consume
except that if the incoming continuation
does not find a match,
any existing waiting continuation
at channels
will be replaced by the incoming one and it will be a persistent
one.
/** Searches the store for a continuation that has patterns that match the given data at the * given channel. * * If no match is found, then the data is put in the store at the given channel. * * If a match is found, then the continuation is returned along with the matching data. * * Matching data or continuations stored with the `persist` flag set to `true` will not be * removed when they are retrieved. See below for more information about using the `persist` * flag. * * '''NOTE''': * * A call to [[produce]] that is made with the persist flag set to `true` only persists when * there are no matching continuations. * * This means that in order to make a piece of data "stick" in the store, the user will have to * continue to call [[produce]] until a `None` is received. * * @param channel A channel on which to search for matching continuations and/or store data * @param data A piece of data * @param persist Whether or not to attempt to persist the data */ def produce(channel: C, data: A, persist: Boolean)(implicit m: Match[P, A]): Option[(K, Seq[A])] |
case class Checkpoint(root: Blake2b256Hash, log: trace.Log) /** Creates a checkpoint. * * @return A [[Checkpoint]] */ def createCheckpoint(): Checkpoint /** Resets the store to the given root. * * @param root A BLAKE2b256 Hash representing the checkpoint */ def reset(root: Blake2b256Hash): Unit |
The usage of the checkpoint mechanism is straight-forward:
//given a space: val space: ISpace[C, P, A, K] = RSpace.create(???) val checkpoint0 = space.getCheckpoint() space.store.isEmpty // => res0: Boolean = true space.consume(???) space.store.isEmpty // => res1: Boolean = false val checkpoint1 = space.getCheckpoint() space.reset(checkpoint0.root) space.store.isEmpty // => res2: Boolean = true space.reset(checkpoint1.root) space.store.isEmpty // => res3: Boolean = false |
There exists a deterministic counterpart of the non-deterministic RSpace - the ReplayRSpace.
Before executing consume and produce operations, it needs to be `rigged` using a checkpoint hash and a log of operations recorded previously by a non-deterministic RSpace.
class ReplayRSpace[C, P, A, K](val store: IStore[C, P, A, K], val branch: Branch)( implicit serializeC: Serialize[C], serializeP: Serialize[P], serializeA: Serialize[A], serializeK: Serialize[K] ) extends ISpace[C, P, A, K] { |
The trace log is a synchronized sequence of events embedded in RSpace.
sealed trait Event case class COMM(consume: Consume, produces: Seq[Produce]) extends Event sealed trait IOEvent extends Event class Produce private (val hash: Blake2b256Hash) extends IOEvent { ... } class Consume private (val hash: Blake2b256Hash) extends IOEvent { ... } type Log = immutable.Seq[Event] private val eventLog: SyncVar[Log] |
Trace logs are recorded during usage of consume, produce, and install operations executed on the normal (non-deterministic) RSpace instance.
// consume val ref = Consume.create(channels, patterns, continuation, persist) // install val ref = Consume.create(channels, patterns, continuation, true) // produce val ref = Produce.create(channel, data, persist) eventLog.update(ref +: _) |
Every time a match is found, either during consume, produce, or install operations, an additional COMM event is written to the trace log.
eventLog.update(COMM(consumeRef, dataCandidates.map(_.datum.source)) +: _) |
During checkpoint creation the contents of the eventLog are stored in the `Checkpoint` object for future reference.
val rigPoint = space.createCheckpoint() |
After this is done the eventLog is cleared.
The checkpoint data can be used to rig a ReplayRSpace. This operation initalizes the replay data used for deterministic replay of events.
def rig(startRoot: Blake2b256Hash, log: trace.Log): Unit type MultisetMultiMap[K, V] = mutable.HashMap[K, com.google.common.collect.Multiset[V]] type ReplayData = MultisetMultiMap[IOEvent, COMM] private val replayData: SyncVar[ReplayData] replaySpace.rig(emptyPoint.root, rigPoint.log) |
The consume and produce operations of ReplayRSpace behave similarly to those of the normal RSpace. The main difference is that whenever a COMM event occurs, the replayData is used to check whether it occurred during normal operation of the RSpace. If not, an exception is thrown. If yes, it is removed from the replayData so it cannot occur again.
This implies, that in replay mode exactly the same COMM events as in the log must occur and no others.
val resultConsume = space.consume(channels, patterns, continuation, false) val resultProduce = space.produce(channels(0), datum, false) val replayResultConsume = replaySpace.consume(channels, patterns, continuation, false) val replayResultProduce = replaySpace.produce(channels(0), datum, false) replayResultProduce shouldBe resultProduce |
Describe how the software will interface with physical systems, ex: ports for communication, which devices are to be supported and any hardware protocols used.
Storage.02 depends on the availability of a writeable Path
that the LMDB can access with write privileges.
Will software be used in creating the system? If so, indicate what software (name and version), how it is to be used, and any details about how it interfaces with the software being specified.
What is the user interface for the software? How will users interact with the software? List out all the aspects involved in making the UI better for all users that will interact or support the software.
How will the software use any communications interfaces?
Provide a description of the software system, including its functionality and matters relating to the overall system and design. Feel free to split this up into subsections, or use data flow diagrams or process flow diagrams.
ISpace is an abstraction that allows a client to perform high-level actions (produce & consume) on an underlying tuplespace implementation.
The only implementation available is RSpace. It's core characteristics are:
serializers
to be in scopeSerialize[C]
for channelsSerialize[P]
for patternsSerialize[A]
for dataSerialize[K]
for continuationsThe LMDBStore sits directly on the LMDB database. It introduces low level entities:
Join
Channel
Waiting Continuations
Data
and according CRUD actions for each.
Internally, entities are bundled in GNATs.
/** [[GNAT]] is not a `Tuple3` */ final case class GNAT[C, P, A, K]( channels: Seq[C], data: Seq[Datum[A]], wks: Seq[WaitingContinuation[P, K]] // waiting continuations ) |
Each action requires a transaction to be open.
private[rspace] def createTxnRead(): T = env.txnRead private[rspace] def createTxnWrite(): T = env.txnWrite private[rspace] def withTxn[R](txn: T)(f: T => R): R |
Additionally LMDBStore
allows checkpoint creation.
def createCheckpoint(): Blake2b256Hash |
This is required to allow #rollback.
The StoreEventsCounter gathers m
etrics.
final class Counter { private[this] val sumCounter: AtomicReference[SumCounter] = new AtomicReference[SumCounter](SumCounter(0, 0, 0)) private[rspace] class StoreEventsCounter val producesCounter val consumesCounter val consumesCommCounter val producesCommCounter val installCommCounter case class StoreCount(count: Long, avgMilliseconds: Double, peakRate: Int, currentRate: Int) case class StoreCounters(sizeOnDisk: Long, dataEntries: Long, consumesCount: StoreCount, producesCount: StoreCount, consumesCommCount: StoreCount, producesCommCount: StoreCount, installCommCount: StoreCount) |
The LMDBStore depends on Codecs for base entities. These can be derived from Serialize
.
implicit val codecC: Codec[C] = sc.toCodec implicit val codecP: Codec[P] = sp.toCodec implicit val codecA: Codec[A] = sa.toCodec implicit val codecK: Codec[K] = sk.toCodec |
On the lowest level the LMDBStore relies on the available codecs to encode/decode data.
The underlying LMDB instance is divided into:
_dbGNATs: Dbi[ByteBuffer], _dbJoins: Dbi[ByteBuffer], |
Additional tracking structures:
Seq[TrieUpdate[C, P, A, K]] |
This structure is used as part of the checkpoint & rollback mechanism.
ITrieStore[Txn[ByteBuffer], Blake2b256Hash, GNAT[C, P, A, K]] |
The LMDBStore holds an instance of LMDBTrieStore. This is used as the store to keep checkpoints.
The available implementation is based on Merkle Patricia Trie.
The underlying store is split into:
_dbTrie: Dbi[ByteBuffer], _dbRoot: Dbi[ByteBuffer] |
The root db holds the current root, and tries holds all the pieces of data.
This implementation uses the concept of Skip (Extension) Nodes.
List any limitations or system constraints that have an impact on the design of the software. These can be any of the following:
Hardware/software environment, End User Environment, Resource availability, Standards Compliance, Interop, Interface/Protocol requirements, Data requirements, Security, Performance, Networking, Verification & validation, means to address quality goals.
"An ultra-fast, ultra-compact, crash-proof key-value embedded data store."
http://www.lmdb.tech/doc/index.html (Doxygen documentation for C version)
https://github.com/lmdbjava/lmdbjava
http://www.javadoc.io/doc/org.lmdbjava/lmdbjava/0.6.0 (javadoc)
"...are a language-neutral, platform-neutral extensible mechanism for serializing structured data."
Both Rholang objects and Blockchain blocks will be serialized to and deserialized from binary using the Protocol Buffer serialization format.
The serialized data will be stored in LMDB as the value portion of the key-value pair.
https://developers.google.com/protocol-buffers/
"...is a protocol buffer compiler plugin for Scala. It will generate Scala case classes, parsers and serializers for your protocol buffers."
Could include software or hardware, Operating systems and platforms, User Personas, Changes in functionality.
We use Kamon as our monitoring toolkit. Below is the overview of metrics published by RSpace:
* COMM event counters distinguished by the operation which triggered the event:
private[this] val consumeCommCounter = Kamon.counter("rspace.comm.consume") private[this] val produceCommCounter = Kamon.counter("rspace.comm.produce") |
* Spans tracing execution times of all operations:
private[this] val consumeSpan = Kamon.buildSpan("rspace.consume") private[this] val produceSpan = Kamon.buildSpan("rspace.produce") protected[this] val installSpan = Kamon.buildSpan("rspace.install") |
Analogous metrics exist for the ReplayRSpace.
These metrics are published by Kamon in a Prometheus format so they can be scraped by the latter. Prometheus is configured as the data source in Grafana, allowing us to query the time series using the built-in expression language and visualize them.
To find the number of COMM events per second triggered by produce operations use: rate(rspace_comm_produce_total[1m])
"rspace_comm_produce_total[1m]" - converts the time series to a range vector containing data points gathered over the last minute
The "rate" function calculates the per second average rate of increase of the time series in the range vector
Number of COMM events per second triggered by consume operations is defined analogously.
Describe any design decisions and/or strategies that affect the overall organization of the system and its higher-level structures. These strategies should provide insight into the key abstractions and mechanisms used in the system architecture. Describe the reasoning employed for each decision and/or strategy (possibly referring to previously stated design goals and principles) and how any design goals or priorities were balanced or traded-off. Such decisions might concern (but are not limited to) things like the following:
This section should provide a high level overview of how the functionality and responsibilities of the the system were partitioned and then assigned to subsystems or components. The main purpose here is to gain a general understanding of how and why the system was decomposed, and how the individual parts work together to provide the desired functionality. If there are diagrams, models, flowcharts, or documented scenarios. This applies only to the scope of the document, not the entire system.