DAG Storage Specification (Draft)

1. Introduction

1.1 Purpose of the system

The purpose of this document is to describe the block DAG storage which consist out of two main parts:

  • The latest messages data structure, which represents a mapping between validator public keys and the hash of the latest block the respective validator has seen. It is used in various places by Casper protocol such as equivocation detection and score map building, but, most importantly, it is used by safety oracle to count agreement graph's edges as it is computation heavy task.

    In order to count agreement graph's edges, safety oracle, provided by a list of validator candidates, checks for every pair of validators if they see an agreement with each other. This process involves looking up the latest message for the first validator and checking its justifications to find one sent by the second validator and compatible with the estimate (block to detect safety on). The same process is then done for the second validator with respect to the first validator. As you can see, the process of counting agreement graph's edges involves looking up latest messages for n^2 number of validator pairs, where n is the number of validators which can reach a few thousands.

    The purpose of this document is to provide a specification of latest messages data structure storage on disk that will be efficient enough to allow safety oracle computation to do constant-time lookups in the storage.

  • The graph storage, which represents the layout of the graph using in the RChain architecture. It constantly increases in size due to production of new blocks and as blocks are effectively unlimited in size, plainly storing the whole DAG consisting out of full blocks in memory will not be possible once DAG reaches a certain size. In order to handle huge DAGs, node software should be able to keep only relevant parts of the DAG in memory while maintaining an ability to access the other parts of the DAG if requested. This document describes how the DAG is stored on disk, new API for working with the DAG, which parts are kept in memory and how the other parts can temporarily loaded into it. The document also keeps in mind the mechanism of checkpointing - a future improvement which falls out of the scope of this document. This storage however does not store blocks in their full form, block storage is covered in Block Storage Specification.

The node writes complete block data to disk, in order to minimize network synchronization across node restarts. If a node is down for extended periods, it will be forced to perform synchronization with others nodes in order to update both its RBlocks data store, the DAG, and the LatestMessages table for validators. The algorithm by which this update occurs and the network protocols enabling it are beyond the scope of this document. (See <TBD>)

1.2 Design goals

The design goals of the DAG storage include:

  1. Take an immutable snapshot of the DAG
  2. Look up children of a block by its hash in a DAG snapshot
  3. Look up block metadata of a block by its hash in a DAG snapshot
  4. Look up topological sorting of a particular part of the DAG in a DAG snapshot
  5. Look up latest message of a validator by its public key in a constant time in a DAG snapshot
  6. Insert new block into the DAG
  7. Snapshots should provide quick access to the recent parts of the DAG
  8. Snapshots should provide (perhaps slower) access to the older parts of the DAG
  9. Do not keep the whole DAG loaded into memory
  10. Conceal the fact that possibly not whole DAG is presented in the memory from the user
  11. Be able to restore the DAG on node startup in a normal scenario (i.e. without system crashes)
  12. Be able to restore some recent state of the DAG on node startup in case of a storage corruption (e.g. by a partial write)

2. Current software architecture

As for now DAG is stored in-memory only and represented as BlockDag class:
BlockDag
final case class BlockDag(
    childMap: Map[BlockHash, Set[BlockHash]],
    latestMessages: Map[Validator, BlockMessage],
    latestMessagesOfLatestMessages: Map[Validator, Map[Validator, BlockHash]],
    dataLookup: Map[BlockHash, BlockMetadata],
    topoSort: Vector[Vector[BlockHash]],
    sortOffset: Long
)


3. Proposed software architecture

3.1 Overview

The proposed API for DAG storage:

BlockDagStorage
trait BlockDagStorage[F[_]] {
  def getRepresentation: F[BlockDagRepresentation[F]]
  def insert(block: BlockMessage): F[Unit]
  def checkpoint(blockHash: BlockHash): F[Unit]
  def clear(): F[Unit]
  def close(): F[Unit]
}

object BlockDagStorage {
  def apply[F[_]](implicit B: BlockDagStorage[F]): BlockDagStorage[F] = B
}

trait BlockDagRepresentation[F[_]] {
  def children(blockHash: BlockHash): F[Option[Set[BlockHash]]]
  def lookup(blockHash: BlockHash): F[Option[BlockMetadata]]
  def contains(blockHash: BlockHash): F[Boolean]
  def topoSort(startBlockNumber: Long): F[Vector[Vector[BlockHash]]]
  def topoSortTail(tailLength: Long): F[Vector[Vector[BlockHash]]]
  def deriveOrdering(startBlockNumber: Long): F[Ordering[BlockMetadata]]
  def latestMessageHash(validator: Validator): F[Option[BlockHash]]
  def latestMessage(validator: Validator): F[Option[BlockMetadata]]
  def latestMessageHashes: F[Map[Validator, BlockHash]]
  def latestMessages: F[Map[Validator, BlockMetadata]]
}

The BlockDagStorage itself contains only a few methods:

  • getRepresentation returns an immutable snapshot representing the current DAG state
  • insert updates DAG's current state by inserting the provided block
  • checkpoint hints the storage that the provided block is a new checkpoint
  • clear clears the storage of all data
  • close closes the storage at which point it can not be accessed anymore

Main queries supported by DAG snapshot are shown in BlockDagRepresentation trait which implementation is returned by getRepresentation method:

  • children – looks up the set of blocks which are children of the given block. Has the same semantics as childMap in the legacy implementation of BlockDag.
  • dataLookup – looks up the block metadata by its hash. Has the same semantics as dataLookup in the legacy implementation of BlockDag.
  • topoSort – returns a topologically sorted suffix of blocks in the DAG snapshot starting from the provided block number. It is a slightly modified version of the toposort field in the legacy implementation of BlockDag.
  • latestMessageHash – looks up the hash of a latest message for a given validator. It is a weaker version of the latestMessages field in the legacy implementation of BlockDag as the new version is only capable of returning a lightweight block hash instead of a full block message.

Apart from these main methods, there are also a few other slightly different helper methods which semantics can be easily deduced by their names and types.

The proposed BlockDagFileStorage implements the API in the following fashion:

DagFileStorage
final class BlockDagFileStorage[F[_]: Monad: Concurrent: Sync: Log] private (
    lock: Semaphore[F],
    latestMessagesRef: Ref[F, Map[Validator, BlockHash]],
    childMapRef: Ref[F, Map[BlockHash, Set[BlockHash]]],
    dataLookupRef: Ref[F, Map[BlockHash, BlockMetadata]],
    topoSortRef: Ref[F, Vector[Vector[BlockHash]]],
    latestMessagesLogOsRef: Ref[F, OutputStream],
    latestMessagesLogSizeRef: Ref[F, Int],
    latestMessagesCrcRef: Ref[F, CRC32],
    latestMessagesDataFilePath: Path,
    latestMessagesCrcFilePath: Path,
    latestMessagesLogMaxSizeFactor: Int
) extends BlockDagStorage[F] {
  private final case class FileDagRepresentation(
      latestMessagesMap: Map[Validator, BlockHash],
      childMap: Map[BlockHash, Set[BlockHash]],
      dataLookup: Map[BlockHash, BlockMetadata],
      topoSortVector: Vector[Vector[BlockHash]]
  ) extends BlockDagRepresentation[F] {
    ...
  }
  private def updateLatestMessagesFile(validator: Validator, blockHash: BlockHash): F[Unit]
  private def updateLatestMessagesCrcFile(newCrc: CRC32): Unit
  private def squashLatestMessagesDataFile(): F[Unit]
  
  def getRepresentation: F[BlockDagRepresentation[F]] =
    for {
      _              <- lock.acquire
      latestMessages <- latestMessagesRef.get
      childMap       <- childMapRef.get
      dataLookup     <- dataLookupRef.get
      topoSort       <- topoSortRef.get
      _              <- lock.release
    } yield FileDagRepresentation(latestMessages, childMap, dataLookup, topoSort)

  def insert(block: BlockMessage): F[Unit] =
    for {
      _                     <- lock.acquire
      latestMessages        <- latestMessagesRef.get
      latestMessagesLogSize <- latestMessagesLogSizeRef.get
      _ <- if (latestMessagesLogSize > latestMessages.size * latestMessagesLogMaxSizeFactor) {
            squashLatestMessagesDataFile()
          } else {
            ().pure[F]
          }
      _ <- dataLookupRef.update(_.updated(block.blockHash, BlockMetadata.fromBlock(block)))
      _ <- childMapRef.update(childMap =>
            parentHashes(block).foldLeft(childMap) {
              case (acc, p) =>
                val currChildren = acc.getOrElse(p, HashSet.empty[BlockHash])
                acc.updated(p, currChildren + block.blockHash)
          })
      _ <- topoSortRef.update(topoSort => TopologicalSortUtil.update(topoSort, 0L, block))
      _ <- latestMessagesRef.update(_.updated(block.sender, block.blockHash))
      _ <- updateLatestMessagesFile(block.sender, block.blockHash)
      _ <- lock.release
    } yield ()

  ...
}

object BlockDagFileStorage {
  final case class Config(
      latestMessagesDataPath: Path,
      latestMessagesCrcPath: Path,
      latestMessagesLogMaxSizeFactor: Int = 10
  )

  def create[F[_]: Monad: Concurrent: Sync: Log](config: Config): F[BlockDagFileStorage[F]]
}

3.2 System decomposition

BlockDagFileStorage is composed out of two different storages:

  • Latest messages storage is designed as a combination of in-memory representation and synchronized on-disk representation implemented as a log sequence of publickey-blockhash inserts/updates which are squashed when the log grows much larger than the actual latest messages structure size.
  • Graph storage which is designed as a combination of old checkpointed parts of the DAG stored in different files and an actively opened file which contains only the recent part of the DAG (i.e. after the last checkpoint).

3.2.1 Latest Messages Storage

Every instance of BlockDagFileStorage can only be created by using BlockDagFileStorage.create provided by the BlockDagFileStorage.Config instance containing the path to the on-disk log file and CRC file. On create invocation, the log file is read into memory as a List[(Validator, BlockHash)], its CRC value is recalculated and matched to the one written in the CRC file. In case of a mismatch, the last pair in the list is dropped and CRC is recalculated and matched to the written CRC again. This catches most types of storage corruptions as the only way the storage file is mutated is by appending 64 bytes to its end, hence only the last 64 bytes could be partially written. Note that the file's length change is atomic and can not be increased by anything else than 64 bytes (assuming other process in user's system did not change the file while the node was down). In case the list without the last pair also does not match the written CRC, the latest messages are rebuilt from scratch using the persisted DAG from graph storage and log/CRC files are generated from the built latest messages. The resulting list is then converted to a Map[Validator, BlockHash] and will be called in-memory representation in the rest of the section.

Latest messages storage is responsible for the following constructor arguments of BlockDagFileStorage:

  • latestMessagesRef – the in-memory representation, enclosed under Ref (atomic reference)
  • latestMessagesLogOsRef – an output stream appending provided data to the log file specified by latestMessagesDataPath from the passed configuration, enclosed under Ref
  • latestMessagesLogSizeRef – current number of entrances in the latest messages log file which is calculated while reading the list from the previous paragraph, enclosed under Ref
  • latestMessagesCrcRef – the calculated CRC (which should coincide to the CRC written in CRC file), enclosed under Ref
  • latestMessagesDataFilePath – the path where the log file is located
  • latestMessagesCrcFilePath – the path where the CRC file is located
  • latestMessagesLogMaxSizeFactor – the maximal factor by which log's size can be more than the actual data structure's size, specified by latestMessagesLogMaxSizeFactor from the passed configuration

Latest messages storage subsystem supports the following operations:

  • Lookups in DAG snapshot can be done directly by using the provided latestMessagesMap. No locking is required once an instance of FileDagRepresentation is received and hence n^2 latest message lookups in Safety Oracle calculation can be performed very quickly.
  • Inserts in DAG update the in-memory representation, increase the size of the log by 1 and update the in-memory CRC value. After that, a single validator-blockhash entry is appended to the end of the log file and the CRC file is updated atomically by creating a temporary file with the new CRC value and replacing the old file with it.

3.2.2 Graph Storage

TODO: Update this section to match the new implementation

Some auxiliary classes are introduced in DagFileStorage object:

  • DagInfo is a new version of the old BlockDag data structure with everything regarding latest messages removed from it
  • Checkpoint represents a single checkpoint stored on disk. start and end represent the range of block numbers the checkpoint stores. path points to the place where on disk the checkpoint data is stored. dagInfo is an optional weak reference to a loaded DagInfo representing this checkpoint. Initially, all checkpoint have this value set to None, but, if user will require some data from this part of the DAG, it will be loaded into memory. Afterwards, it can be reclaimed by the GC since dagInfo stores information in a weak reference.
  • Config is a configuration necessary for instantiating DagFileStorage. It contains the parent directory for checkpoints and for the active dag storage file. This parent directory will be called storage directory in this document.

DagFileStorage has the following constructor parameters:

  • lock is used for mutually exclusive access to the active dag storage file
  • checkpointsRef is a list of known checkpoints located under storage directory supplied by configuration
  • currentFile is an output stream to the active dag storage file
  • currentDagInfoRef is a part of the DAG after the last checkpoint (i.e. the same part which has been written to currentFile)

Draft implementations of insert and children can be seen in the code block above. insert always takes the lock since it will write the new block to the active dag storage file. On the other hand, children does not require taking the lock since currentDagInfo can be pulled out of atomic reference currentDagInfoRef and then safely used for looking up the provided block hash in currentDagInfo.childrenMap. If currentDagInfo does not know about the requested block, DagFileStorage looks the block up in the provided BlockStore and then looks up the block's children in the relevant checkpoint determined by block's number. Checkpoint might require loading from disk in case it was not loaded previously or was reclaimed by GC. 

3.2 Software/hardware mapping

3.2.1 Memory allocation

  • Latest messages. Both validator public key size and block has size are 52 bytes each (8 bytes for pointer + 12 bytes for array header + 32 bytes for the content), so a single pair requires 112 bytes of memory (52 * 2 + 8 as pair itself also requires a pointer). Hence, the in-memory representation requires about 33 KiB of RAM for test net bond file with 300 validators and only 1 MiB of RAM in case the amount of validators bloats up to 10000. The in-memory index will at most double the memory consumption. Hence, we do not expect that holding in-memory representation will make a significant effect on memory consumption.\
  • Graph storage. The details of checkpointing are not established yet, so some approximations will be given in this section. Amount of children blocks for a block is also not a fixed parameter and hence will be approximated. One block hash is 32 bytes and, assuming a single block has about 5 children, set of children for a block is 5 * 32 bytes. Taking Java headers' overhead into account, a single entry in DagInfo.childrenMap takes about 312 bytes. BlockMetadata is composed out of block hash, sender, seqNum, parents and justifications. It will require (32 + 12) + (32 + 12) + 4 + ((32 + 12) * 5 + 8) + ((32 + 32 + 12 + 12) * 5 + 8) = 768 bytes to store one block metadata. Hence, a single entry in DagInfo.dataLookup takes 812 bytes. topSort adds additional 44 bytes per block to that amount. In total, inserting a single block would require 312 + 812 + 44 = 1168 bytes.

    Assuming the speed of the network is 10 blocks/second and checkpoints are done once in a day, there will be 10 * 60 * 60 * 24 = 864000 blocks in one checkpoint. Since every block requires 1168 bytes to be stored, total weight of a checkpoint is about 940 MiB. The same is true for every additional checkpoint loaded into memory.

3.2.2 Disk Memory

  • Latest messages. In case of on-disk representation, the actual size of a pair requires only 64 bytes as headers and pointers are not required for storing data on the disk. Considering that the system is configurable in terms of the factor by which the log can be bigger than the actual data structure, 19 * factor KiB is required for storing a file with 300 publickey-blockhash pairs and 625 * factor KiB is required for the severe case of 10000 publickey-blockhash pairs. With a default factor of 10 the size of the log be a few megabytes at most.
  • Graph storage. Dag storage layout is shown in detail in section 3.3 and it takes 12408 bytes to store a single block with 5 parent, 5 justifications and 300 validators. Storing a DAG consisting out of 10M blocks would require 115 GiB of disk space.

3.3 Persistent data management

The configuration for latest messages storage will be added to the existing node configuration coop.rchain.node.configuration.Configuration:

Node Configuration
final class Configuration(
    val command: Command,
    val server: Server,
    val grpcServer: GrpcServer,
    val tls: Tls,
    val casper: CasperConf,
    val blockstorage: LMDBBlockStore.Config,
    val blockDagStorage: BlockDagFileStorage.Config,
    private val options: commandline.Options
)

The default value for blockDagStorage is defined by using dataDir (which defaults to $HOME/.rnode) as follows:

LatestMessagesFileStorage.Config default
val blockDagStorage = BlockDagFileStorage.Config(
  dataDir.resolve("casper-latest-messages-log"),
  dataDir.resolve("casper-latest-messages-crc")
)

The default values can be redefined by adding the following command-line options to coop.rchain.node.configuration.commandline.Options:

New options
val casperLatestMessagesLogFile =
  opt[Path](required = false, descr = "Path to latest messages log file") // --casper-latest-messages-log-file
val casperLatestMessagesCrcFile =
  opt[Path](required = false, descr = "Path to latest messages crc file") // --casper-latest-messages-crc-file

BlockDagFileStorage will be initialized in NodeRuntime.main using the provided configuration. As SafetyOracle depends on latest messages data structure, it will have to be initialized after BlockDagFileStorage.

3.3.1 Latest Messages data

As both publickey and blockhash are invariable in size, the proposed method of storing latest messages data structure on disk is to consequently write out the publickey-blockhash pairs byte-by-byte (i.e. 32 bytes of publickey followed by 32 bytes of blockhash).

This data will be stored in the file specified by BlockDagFileStorage.Config.latestMessagesDataPath and the CRC value will be stored in the file specified by BlockDagFileStorage.Config.latestMessagesCrcPath.

3.3.2 Graph data

Currently BlockMetadata's implementation looks like this:

BlockMetadata
final case class BlockMetadata(
    blockHash: BlockHash,
    parents: List[BlockHash],
    sender: ByteString,
    justifications: List[Justification],
    weightMap: Map[ByteString, Long],
    seqNum: Int
)

This whole structure is persisted in the dag storage file using the following scheme:

  • blockHash - 32 bytes
  • blockNumber - 8 bytes
  • sender - 32 bytes
  • seqNum - 4 bytes
  • parentsLength - 4 bytes for the length of parents
  • parents - a sequence of 32 byte hashes and the length of the sequence is parentsLength
  • justificationsLength - 4 bytes for the length of justifications
  • justifications - a sequence of pairs of 32 validator public keys and 32 byte hashes, and the length of the sequence is justificationsLength
  • weightMapLength - 4 bytes for the length of weightMap
  • weightMap - a sequence of pairs of 32 byte validator public keys and 8 byte stakes, and the length of the sequence is weightMapLength

The structure is variable in size and hence requires some additional values for deterministic deserialization (parentsLengthjustificationsLength and weightMapLength). blockNumber was not a part of BlockMetadata, but it is required for successful rebuilding of topological sorting.

3.5 Operational aspects

3.5.1 Metrics

BlockDagFileStorage will export the following metrics by reusing the existing class coop.rchain.metrics.Metrics:

  • Getting a DAG representation by invoking getRepresentation method will be reported with name "block-dag-file-storage-get-representation"
  • Inserting a new block by invoking insert method will be reported with name "block-dag-file-storage-insert"
  • Checkpointing by invoking checkpoint method will be reported with name "block-dag-file-storage-checkpoint"

3.5.2 Logs

BlockDagFileStorage will log unsuccessful attempts to read CRC value and will log when it have to rebuilt the latest messages data structure form the persisted graph. 

3.6 Global software control

Inserts and getting representations both take the lock as they both need a consistent snapshot of the internal data structures (e.g. taken dataLookup should contain all the blocks which are also contained in taken latestMessages). However, working with the representation does not require taking any kind of lock as the representation is immutable.

3.7 Boundary conditions

  • On node startup known checkpoint metainformation (`start`, end and path) are read from the provided directory and the active DAG storage file is read into memory as DagInfo
  • On node shutdown closes currentFile
  • In case of node error, on startup the latest messages log file is checked for corruptions by comparing CRC from the CRC file to the actual CRC value of the log file. In case of a mismatch, CRC is recalculated for the log but without the last 64 bytes (i.e. the last entry) and compared to the written CRC once again. In case of another mismatch, the latest messages data structure is built from scratch using the persisted DAG.