Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Reverted from v. 1

Motivation

Communication layer uses UDP as transport layer for inter-node communication.

The handshake between nodes is divided into two phases:

  1. Encryption handshake (public key exchange)
  2. Protocol handshake (nonce)

The encryption part is provided by an Rchain custom solution. On the other hand the communication pattern between nodes corresponds to request-response cycles. So UPD is not a good choice since this protocol corresponds to a fire-and-forget semantic.

Background

GRPC is a communication protocol based on Google Protocol Buffers (protobuf) over TCP. The implementation for Java provides an HTTP/2 TLS communication out of the box. It's a natural choice for efficient communication based on request-response cycles. On the other hand TLS is a proven and reliable secure communication protocol. ChangingĀ 

Caveat

Nodes are identified in the Rchain P2P-network by their public key. Although GRPC is convenient to use, there is quite a hassle to extract the peer public key from the underlying TLS communication. To extract the keys it is required to inject call interceptors on client and server side. A POC implementation can be found hereĀ Github: grpc-example. Interceptor excerpts from this repository:

Code Block
languagescala
titleSslSessionClientInterceptor.scala
linenumberstrue
class SslSessionClientCallInterceptor[ReqT, RespT](next: ClientCall[ReqT, RespT]) extends ClientCall[ReqT, RespT] {
  self =>

  private val logger = Logger(this.getClass)

  def cancel(message: String, cause: Throwable): Unit = next.cancel(message, cause)
  def request(numMessages: Int): Unit = next.request(numMessages)
  def sendMessage(message: ReqT): Unit = next.sendMessage(message)
  def halfClose(): Unit = next.halfClose()

  override def isReady: Boolean = next.isReady
  override def setMessageCompression(enabled: Boolean): Unit = next.setMessageCompression(enabled)
  override def getAttributes: Attributes = next.getAttributes

  def start(responseListener: ClientCall.Listener[RespT], headers: Metadata): Unit =
    next.start(new InterceptionListener(responseListener), headers)

  private class InterceptionListener(next: ClientCall.Listener[RespT]) extends ClientCall.Listener[RespT] {
    override def onClose(status: Status, trailers: Metadata): Unit = next.onClose(status, trailers)
    override def onReady(): Unit = next.onReady()
    override def onHeaders(headers: Metadata): Unit = next.onHeaders(headers)

    override def onMessage(message: RespT): Unit = {
      message match {
        case handshake: HandshakeReply =>
          val sslSession: Option[SSLSession] = Option(self.getAttributes.get(Grpc.TRANSPORT_ATTR_SSL_SESSION))
          if (sslSession.isEmpty) {
            logger.error("No SSL Session found in client call")
            close()
          } else {
            sslSession.foreach { session =>
              val pubKey = Base64.getEncoder.encodeToString(session.getPeerCertificates.head.getPublicKey.getEncoded)
              if (pubKey == handshake.key) {
                next.onMessage(message)
              } else {
                logger.error("Wrong public key")
                close()
              }
            }
          }

        case _ => next.onMessage(message)
      }
    }

    private def close(): Unit =
      throw Status.UNAUTHENTICATED.withDescription("Wrong public key").asRuntimeException()

  }
}


Code Block
languagescala
titleSslSessionServerInterceptor.scala
linenumberstrue
class SslSessionServerInterceptor() extends ServerInterceptor {

  private val logger = Logger(this.getClass)

  def interceptCall[ReqT, RespT](
    call: ServerCall[ReqT, RespT],
    headers: Metadata,
    next: ServerCallHandler[ReqT, RespT]
  ): ServerCall.Listener[ReqT] = new InterceptionListener(next.startCall(call, headers), call)

  private class InterceptionListener[ReqT, RespT](next: ServerCall.Listener[ReqT], call: ServerCall[ReqT, RespT]) extends ServerCall.Listener[ReqT] {
    override def onHalfClose(): Unit = next.onHalfClose()
    override def onCancel(): Unit = next.onCancel()
    override def onComplete(): Unit = next.onComplete()
    override def onReady(): Unit = next.onReady()

    override def onMessage(message: ReqT): Unit = {
      message match {
        case handshake: HandshakeRequest =>
          val sslSession: Option[SSLSession] = Option(call.getAttributes.get(Grpc.TRANSPORT_ATTR_SSL_SESSION))
          if (sslSession.isEmpty) {
            logger.error("No SSL Session found in server call")
            close()
          } else {
            sslSession.foreach { session =>
              val pubKey = Base64.getEncoder.encodeToString(session.getPeerCertificates.head.getPublicKey.getEncoded)
              if (pubKey == handshake.key) {
                next.onMessage(message)
              } else {
                logger.error("Wrong public key")
                close()
              }
            }
          }
        case _ => next.onMessage(message)
      }
    }

    private def close(): Unit =
      throw Status.UNAUTHENTICATED.withDescription("Wrong public key").asRuntimeException()
  }
}