The purpose of this document is to describe the block DAG storage which consist out of two main parts:
The latest messages data structure, which represents a mapping between validator public keys and the hash of the latest block the respective validator has seen. It is used in various places by Casper protocol such as equivocation detection and score map building, but, most importantly, it is used by safety oracle to count agreement graph's edges as it is computation heavy task.
In order to count agreement graph's edges, safety oracle, provided by a list of validator candidates, checks for every pair of validators if they see an agreement with each other. This process involves looking up the latest message for the first validator and checking its justifications to find one sent by the second validator and compatible with the estimate (block to detect safety on). The same process is then done for the second validator with respect to the first validator. As you can see, the process of counting agreement graph's edges involves looking up latest messages for n^2 number of validator pairs, where n is the number of validators which can reach a few thousands.
The purpose of this document is to provide a specification of latest messages data structure storage on disk that will be efficient enough to allow safety oracle computation to do constant-time lookups in the storage.
The node writes complete block data to disk, in order to minimize network synchronization across node restarts. If a node is down for extended periods, it will be forced to perform synchronization with others nodes in order to update both its RBlocks data store, the DAG, and the LatestMessages table for validators. The algorithm by which this update occurs and the network protocols enabling it are beyond the scope of this document. (See <TBD>)
The design goals of the DAG storage include:
BlockDag
class:final case class BlockDag( childMap: Map[BlockHash, Set[BlockHash]], latestMessages: Map[Validator, BlockMessage], latestMessagesOfLatestMessages: Map[Validator, Map[Validator, BlockHash]], dataLookup: Map[BlockHash, BlockMetadata], topoSort: Vector[Vector[BlockHash]], sortOffset: Long ) |
3. Proposed software architecture
The proposed API for DAG storage:
trait BlockDagStorage[F[_]] { def getRepresentation: F[BlockDagRepresentation[F]] def insert(block: BlockMessage): F[Unit] def checkpoint(blockHash: BlockHash): F[Unit] def clear(): F[Unit] def close(): F[Unit] } object BlockDagStorage { def apply[F[_]](implicit B: BlockDagStorage[F]): BlockDagStorage[F] = B } trait BlockDagRepresentation[F[_]] { def children(blockHash: BlockHash): F[Option[Set[BlockHash]]] def lookup(blockHash: BlockHash): F[Option[BlockMetadata]] def contains(blockHash: BlockHash): F[Boolean] def topoSort(startBlockNumber: Long): F[Vector[Vector[BlockHash]]] def topoSortTail(tailLength: Long): F[Vector[Vector[BlockHash]]] def deriveOrdering(startBlockNumber: Long): F[Ordering[BlockMetadata]] def latestMessageHash(validator: Validator): F[Option[BlockHash]] def latestMessage(validator: Validator): F[Option[BlockMetadata]] def latestMessageHashes: F[Map[Validator, BlockHash]] def latestMessages: F[Map[Validator, BlockMetadata]] } |
The BlockDagStorage
itself contains only a few methods:
getRepresentation
returns an immutable snapshot representing the current DAG stateinsert
updates DAG's current state by inserting the provided blockcheckpoint
hints the storage that the provided block is a new checkpointclear
clears the storage of all dataclose
closes the storage at which point it can not be accessed anymoreMain queries supported by DAG snapshot are shown in BlockDagRepresentation
trait which implementation is returned by getRepresentation
method:
children
– looks up the set of blocks which are children of the given block. Has the same semantics as childMap
in the legacy implementation of BlockDag
.dataLookup
– looks up the block metadata by its hash. Has the same semantics as dataLookup
in the legacy implementation of BlockDag
.topoSort
– returns a topologically sorted suffix of blocks in the DAG snapshot starting from the provided block number. It is a slightly modified version of the toposort
field in the legacy implementation of BlockDag
.latestMessageHash
– looks up the hash of a latest message for a given validator. It is a weaker version of the latestMessages
field in the legacy implementation of BlockDag
as the new version is only capable of returning a lightweight block hash instead of a full block message.Apart from these main methods, there are also a few other slightly different helper methods which semantics can be easily deduced by their names and types.
The proposed BlockDagFileStorage
implements the API in the following fashion:
final class BlockDagFileStorage[F[_]: Monad: Concurrent: Sync: Log] private ( lock: Semaphore[F], latestMessagesRef: Ref[F, Map[Validator, BlockHash]], childMapRef: Ref[F, Map[BlockHash, Set[BlockHash]]], dataLookupRef: Ref[F, Map[BlockHash, BlockMetadata]], topoSortRef: Ref[F, Vector[Vector[BlockHash]]], latestMessagesLogOsRef: Ref[F, OutputStream], latestMessagesLogSizeRef: Ref[F, Int], latestMessagesCrcRef: Ref[F, CRC32], latestMessagesDataFilePath: Path, latestMessagesCrcFilePath: Path, latestMessagesLogMaxSizeFactor: Int ) extends BlockDagStorage[F] { private final case class FileDagRepresentation( latestMessagesMap: Map[Validator, BlockHash], childMap: Map[BlockHash, Set[BlockHash]], dataLookup: Map[BlockHash, BlockMetadata], topoSortVector: Vector[Vector[BlockHash]] ) extends BlockDagRepresentation[F] { ... } private def updateLatestMessagesFile(validator: Validator, blockHash: BlockHash): F[Unit] private def updateLatestMessagesCrcFile(newCrc: CRC32): Unit private def squashLatestMessagesDataFile(): F[Unit] def getRepresentation: F[BlockDagRepresentation[F]] = for { _ <- lock.acquire latestMessages <- latestMessagesRef.get childMap <- childMapRef.get dataLookup <- dataLookupRef.get topoSort <- topoSortRef.get _ <- lock.release } yield FileDagRepresentation(latestMessages, childMap, dataLookup, topoSort) def insert(block: BlockMessage): F[Unit] = for { _ <- lock.acquire latestMessages <- latestMessagesRef.get latestMessagesLogSize <- latestMessagesLogSizeRef.get _ <- if (latestMessagesLogSize > latestMessages.size * latestMessagesLogMaxSizeFactor) { squashLatestMessagesDataFile() } else { ().pure[F] } _ <- dataLookupRef.update(_.updated(block.blockHash, BlockMetadata.fromBlock(block))) _ <- childMapRef.update(childMap => parentHashes(block).foldLeft(childMap) { case (acc, p) => val currChildren = acc.getOrElse(p, HashSet.empty[BlockHash]) acc.updated(p, currChildren + block.blockHash) }) _ <- topoSortRef.update(topoSort => TopologicalSortUtil.update(topoSort, 0L, block)) _ <- latestMessagesRef.update(_.updated(block.sender, block.blockHash)) _ <- updateLatestMessagesFile(block.sender, block.blockHash) _ <- lock.release } yield () ... } object BlockDagFileStorage { final case class Config( latestMessagesDataPath: Path, latestMessagesCrcPath: Path, latestMessagesLogMaxSizeFactor: Int = 10 ) def create[F[_]: Monad: Concurrent: Sync: Log](config: Config): F[BlockDagFileStorage[F]] } |
BlockDagFileStorage
is composed out of two different storages:
Every instance of BlockDagFileStorage
can only be created by using BlockDagFileStorage
.create
provided by the BlockDagFileStorage
.Config
instance containing the path to the on-disk log file and CRC file. On create
invocation, the log file is read into memory as a List[(Validator, BlockHash)]
, its CRC value is recalculated and matched to the one written in the CRC file. In case of a mismatch, the last pair in the list is dropped and CRC is recalculated and matched to the written CRC again. This catches most types of storage corruptions as the only way the storage file is mutated is by appending 64 bytes to its end, hence only the last 64 bytes could be partially written. Note that the file's length change is atomic and can not be increased by anything else than 64 bytes (assuming other process in user's system did not change the file while the node was down). In case the list without the last pair also does not match the written CRC, the latest messages are rebuilt from scratch using the persisted DAG from graph storage and log/CRC files are generated from the built latest messages. The resulting list is then converted to a Map[Validator, BlockHash]
and will be called in-memory representation in the rest of the section.
Latest messages storage is responsible for the following constructor arguments of BlockDagFileStorage:
latestMessagesRef
– the in-memory representation, enclosed under Ref
(atomic reference)latestMessagesLogOsRef
– an output stream appending provided data to the log file specified by latestMessagesDataPath
from the passed configuration, enclosed under Ref
latestMessagesLogSizeRef
– current number of entrances in the latest messages log file which is calculated while reading the list from the previous paragraph, enclosed under Ref
latestMessagesCrcRef
– the calculated CRC (which should coincide to the CRC written in CRC file), enclosed under Ref
latestMessagesDataFilePath
– the path where the log file is locatedlatestMessagesCrcFilePath
– the path where the CRC file is locatedlatestMessagesLogMaxSizeFactor
– the maximal factor by which log's size can be more than the actual data structure's size, specified by latestMessagesLogMaxSizeFactor
from the passed configurationLatest messages storage subsystem supports the following operations:
latestMessagesMap
. No locking is required once an instance of FileDagRepresentation
is received and hence n^2 latest message lookups in Safety Oracle calculation can be performed very quickly.TODO: Update this section to match the new implementation
Some auxiliary classes are introduced in DagFileStorage
object:
DagInfo
is a new version of the old BlockDag
data structure with everything regarding latest messages removed from itCheckpoint
represents a single checkpoint stored on disk. start
and end
represent the range of block numbers the checkpoint stores. path
points to the place where on disk the checkpoint data is stored. dagInfo
is an optional weak reference to a loaded DagInfo
representing this checkpoint. Initially, all checkpoint have this value set to None
, but, if user will require some data from this part of the DAG, it will be loaded into memory. Afterwards, it can be reclaimed by the GC since dagInfo
stores information in a weak reference.Config
is a configuration necessary for instantiating DagFileStorage
. It contains the parent directory for checkpoints and for the active dag storage file. This parent directory will be called storage directory in this document.DagFileStorage
has the following constructor parameters:
lock
is used for mutually exclusive access to the active dag storage filecheckpointsRef
is a list of known checkpoints located under storage directory supplied by configurationcurrentFile
is an output stream to the active dag storage filecurrentDagInfoRef
is a part of the DAG after the last checkpoint (i.e. the same part which has been written to currentFile
)Draft implementations of insert
and children
can be seen in the code block above. insert
always takes the lock since it will write the new block to the active dag storage file. On the other hand, children
does not require taking the lock since currentDagInfo
can be pulled out of atomic reference currentDagInfoRef
and then safely used for looking up the provided block hash in currentDagInfo.childrenMap
. If currentDagInfo
does not know about the requested block, DagFileStorage
looks the block up in the provided BlockStore
and then looks up the block's children in the relevant checkpoint determined by block's number. Checkpoint might require loading from disk in case it was not loaded previously or was reclaimed by GC.
Graph storage. The details of checkpointing are not established yet, so some approximations will be given in this section. Amount of children blocks for a block is also not a fixed parameter and hence will be approximated. One block hash is 32 bytes and, assuming a single block has about 5 children, set of children for a block is 5 * 32 bytes. Taking Java headers' overhead into account, a single entry in DagInfo.childrenMap
takes about 312 bytes. BlockMetadata
is composed out of block hash, sender, seqNum, parents and justifications. It will require (32 + 12) + (32 + 12) + 4 + ((32 + 12) * 5 + 8) + ((32 + 32 + 12 + 12) * 5 + 8) = 768 bytes to store one block metadata. Hence, a single entry in DagInfo.dataLookup
takes 812 bytes. topSort
adds additional 44 bytes per block to that amount. In total, inserting a single block would require 312 + 812 + 44 = 1168 bytes.
Assuming the speed of the network is 10 blocks/second and checkpoints are done once in a day, there will be 10 * 60 * 60 * 24 = 864000 blocks in one checkpoint. Since every block requires 1168 bytes to be stored, total weight of a checkpoint is about 940 MiB. The same is true for every additional checkpoint loaded into memory.
19 * factor
KiB is required for storing a file with 300 publickey-blockhash pairs and 625 * factor
KiB is required for the severe case of 10000 publickey-blockhash pairs. With a default factor of 10 the size of the log be a few megabytes at most.The configuration for latest messages storage will be added to the existing node configuration coop.rchain.node.configuration.Configuration:
final class Configuration( val command: Command, val server: Server, val grpcServer: GrpcServer, val tls: Tls, val casper: CasperConf, val blockstorage: LMDBBlockStore.Config, val blockDagStorage: BlockDagFileStorage.Config, private val options: commandline.Options ) |
The default value for blockDagStorage
is defined by using dataDir
(which defaults to $HOME/.rnode
) as follows:
val blockDagStorage = BlockDagFileStorage.Config( dataDir.resolve("casper-latest-messages-log"), dataDir.resolve("casper-latest-messages-crc") ) |
The default values can be redefined by adding the following command-line options to coop.rchain.node.configuration.commandline.Options
:
val casperLatestMessagesLogFile = opt[Path](required = false, descr = "Path to latest messages log file") // --casper-latest-messages-log-file val casperLatestMessagesCrcFile = opt[Path](required = false, descr = "Path to latest messages crc file") // --casper-latest-messages-crc-file |
BlockDagFileStorage
will be initialized in NodeRuntime.main
using the provided configuration. As SafetyOracle
depends on latest messages data structure, it will have to be initialized after BlockDagFileStorage
.
As both publickey and blockhash are invariable in size, the proposed method of storing latest messages data structure on disk is to consequently write out the publickey-blockhash pairs byte-by-byte (i.e. 32 bytes of publickey followed by 32 bytes of blockhash).
This data will be stored in the file specified by BlockDagFileStorage.Config.latestMessagesDataPath
and the CRC value will be stored in the file specified by BlockDagFileStorage.Config.latestMessagesCrcPath
.
Currently BlockMetadata
's implementation looks like this:
final case class BlockMetadata( blockHash: BlockHash, parents: List[BlockHash], sender: ByteString, justifications: List[Justification], weightMap: Map[ByteString, Long], seqNum: Int ) |
This whole structure is persisted in the dag storage file using the following scheme:
blockHash
- 32 bytesblockNumber
- 8 bytessender
- 32 bytesseqNum
- 4 bytesparentsLength
- 4 bytes for the length of parents
parents
- a sequence of 32 byte hashes and the length of the sequence is parentsLength
justificationsLength
- 4 bytes for the length of justificationsjustifications
- a sequence of pairs of 32 validator public keys and 32 byte hashes, and the length of the sequence is justificationsLength
weightMapLength
- 4 bytes for the length of weightMap
weightMap
- a sequence of pairs of 32 byte validator public keys and 8 byte stakes, and the length of the sequence is weightMapLength
The structure is variable in size and hence requires some additional values for deterministic deserialization (parentsLength
, justificationsLength
and weightMapLength). blockNumber
was not a part of BlockMetadata
, but it is required for successful rebuilding of topological sorting.
3.5 Operational aspects
BlockDagFileStorage
will export the following metrics by reusing the existing class coop.rchain.metrics.Metrics
:
getRepresentation
method will be reported with name "block-dag-file-storage-get-representation"
insert
method will be reported with name "block-dag-file-storage-insert"
checkpoint
method will be reported with name "block-dag-file-storage-checkpoint"
BlockDagFileStorage
will log unsuccessful attempts to read CRC value and will log when it have to rebuilt the latest messages data structure form the persisted graph.
Inserts and getting representations both take the lock as they both need a consistent snapshot of the internal data structures (e.g. taken dataLookup
should contain all the blocks which are also contained in taken latestMessages
). However, working with the representation does not require taking any kind of lock as the representation is immutable.
end
and path
) are read from the provided directory and the active DAG storage file is read into memory as DagInfo
currentFile