Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Current »

Motivation

Communication layer uses UDP as transport layer for inter-node communication.

The handshake between nodes is divided into two phases:

  1. Encryption handshake (public key exchange)
  2. Protocol handshake (nonce)

The encryption part is provided by an Rchain custom solution. On the other hand the communication pattern between nodes corresponds to request-response cycles. So UPD is not a good choice since this protocol corresponds to a fire-and-forget semantic.

Background

GRPC is a communication protocol based on Google Protocol Buffers (protobuf) over TCP. The implementation for Java provides an HTTP/2 TLS communication out of the box. It's a natural choice for efficient communication based on request-response cycles. On the other hand TLS is a proven and reliable secure communication protocol. ChangingĀ 

Caveat

Nodes are identified in the Rchain P2P-network by their public key. Although GRPC is convenient to use, there is quite a hassle to extract the peer public key from the underlying TLS communication. To extract the keys it is required to inject call interceptors on client and server side. A POC implementation can be found hereĀ Github: grpc-example. Interceptor excerpts from this repository:

SslSessionClientInterceptor.scala
class SslSessionClientCallInterceptor[ReqT, RespT](next: ClientCall[ReqT, RespT]) extends ClientCall[ReqT, RespT] {
  self =>

  private val logger = Logger(this.getClass)

  def cancel(message: String, cause: Throwable): Unit = next.cancel(message, cause)
  def request(numMessages: Int): Unit = next.request(numMessages)
  def sendMessage(message: ReqT): Unit = next.sendMessage(message)
  def halfClose(): Unit = next.halfClose()

  override def isReady: Boolean = next.isReady
  override def setMessageCompression(enabled: Boolean): Unit = next.setMessageCompression(enabled)
  override def getAttributes: Attributes = next.getAttributes

  def start(responseListener: ClientCall.Listener[RespT], headers: Metadata): Unit =
    next.start(new InterceptionListener(responseListener), headers)

  private class InterceptionListener(next: ClientCall.Listener[RespT]) extends ClientCall.Listener[RespT] {
    override def onClose(status: Status, trailers: Metadata): Unit = next.onClose(status, trailers)
    override def onReady(): Unit = next.onReady()
    override def onHeaders(headers: Metadata): Unit = next.onHeaders(headers)

    override def onMessage(message: RespT): Unit = {
      message match {
        case handshake: HandshakeReply =>
          val sslSession: Option[SSLSession] = Option(self.getAttributes.get(Grpc.TRANSPORT_ATTR_SSL_SESSION))
          if (sslSession.isEmpty) {
            logger.error("No SSL Session found in client call")
            close()
          } else {
            sslSession.foreach { session =>
              val pubKey = Base64.getEncoder.encodeToString(session.getPeerCertificates.head.getPublicKey.getEncoded)
              if (pubKey == handshake.key) {
                next.onMessage(message)
              } else {
                logger.error("Wrong public key")
                close()
              }
            }
          }

        case _ => next.onMessage(message)
      }
    }

    private def close(): Unit =
      throw Status.UNAUTHENTICATED.withDescription("Wrong public key").asRuntimeException()

  }
}
SslSessionServerInterceptor.scala
class SslSessionServerInterceptor() extends ServerInterceptor {

  private val logger = Logger(this.getClass)

  def interceptCall[ReqT, RespT](
    call: ServerCall[ReqT, RespT],
    headers: Metadata,
    next: ServerCallHandler[ReqT, RespT]
  ): ServerCall.Listener[ReqT] = new InterceptionListener(next.startCall(call, headers), call)

  private class InterceptionListener[ReqT, RespT](next: ServerCall.Listener[ReqT], call: ServerCall[ReqT, RespT]) extends ServerCall.Listener[ReqT] {
    override def onHalfClose(): Unit = next.onHalfClose()
    override def onCancel(): Unit = next.onCancel()
    override def onComplete(): Unit = next.onComplete()
    override def onReady(): Unit = next.onReady()

    override def onMessage(message: ReqT): Unit = {
      message match {
        case handshake: HandshakeRequest =>
          val sslSession: Option[SSLSession] = Option(call.getAttributes.get(Grpc.TRANSPORT_ATTR_SSL_SESSION))
          if (sslSession.isEmpty) {
            logger.error("No SSL Session found in server call")
            close()
          } else {
            sslSession.foreach { session =>
              val pubKey = Base64.getEncoder.encodeToString(session.getPeerCertificates.head.getPublicKey.getEncoded)
              if (pubKey == handshake.key) {
                next.onMessage(message)
              } else {
                logger.error("Wrong public key")
                close()
              }
            }
          }
        case _ => next.onMessage(message)
      }
    }

    private def close(): Unit =
      throw Status.UNAUTHENTICATED.withDescription("Wrong public key").asRuntimeException()
  }
}


  • No labels