Motivation
Communication layer uses UDP as transport layer for inter-node communication.
The handshake between nodes is divided into two phases:
- Encryption handshake (public key exchange)
- Protocol handshake (nonce)
The encryption part is provided by an Rchain custom solution. On the other hand the communication pattern between nodes corresponds to request-response cycles. So UPD is not a good choice since this protocol corresponds to a fire-and-forget semantic.
Background
GRPC is a communication protocol based on Google Protocol Buffers (protobuf) over TCP. The implementation for Java provides an HTTP/2 TLS communication out of the box. It's a natural choice for efficient communication based on request-response cycles. On the other hand TLS is a proven and reliable secure communication protocol. ChangingĀ
Caveat
Nodes are identified in the Rchain P2P-network by their public key. Although GRPC is convenient to use, there is quite a hassle to extract the peer public key from the underlying TLS communication. To extract the keys it is required to inject call interceptors on client and server side. A POC implementation can be found hereĀ Github: grpc-example. Interceptor excerpts from this repository:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
class SslSessionClientCallInterceptor[ReqT, RespT](next: ClientCall[ReqT, RespT]) extends ClientCall[ReqT, RespT] {
self =>
private val logger = Logger(this.getClass)
def cancel(message: String, cause: Throwable): Unit = next.cancel(message, cause)
def request(numMessages: Int): Unit = next.request(numMessages)
def sendMessage(message: ReqT): Unit = next.sendMessage(message)
def halfClose(): Unit = next.halfClose()
override def isReady: Boolean = next.isReady
override def setMessageCompression(enabled: Boolean): Unit = next.setMessageCompression(enabled)
override def getAttributes: Attributes = next.getAttributes
def start(responseListener: ClientCall.Listener[RespT], headers: Metadata): Unit =
next.start(new InterceptionListener(responseListener), headers)
private class InterceptionListener(next: ClientCall.Listener[RespT]) extends ClientCall.Listener[RespT] {
override def onClose(status: Status, trailers: Metadata): Unit = next.onClose(status, trailers)
override def onReady(): Unit = next.onReady()
override def onHeaders(headers: Metadata): Unit = next.onHeaders(headers)
override def onMessage(message: RespT): Unit = {
message match {
case handshake: HandshakeReply =>
val sslSession: Option[SSLSession] = Option(self.getAttributes.get(Grpc.TRANSPORT_ATTR_SSL_SESSION))
if (sslSession.isEmpty) {
logger.error("No SSL Session found in client call")
close()
} else {
sslSession.foreach { session =>
val pubKey = Base64.getEncoder.encodeToString(session.getPeerCertificates.head.getPublicKey.getEncoded)
if (pubKey == handshake.key) {
next.onMessage(message)
} else {
logger.error("Wrong public key")
close()
}
}
}
case _ => next.onMessage(message)
}
}
private def close(): Unit =
throw Status.UNAUTHENTICATED.withDescription("Wrong public key").asRuntimeException()
}
}
|
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
class SslSessionServerInterceptor() extends ServerInterceptor {
private val logger = Logger(this.getClass)
def interceptCall[ReqT, RespT](
call: ServerCall[ReqT, RespT],
headers: Metadata,
next: ServerCallHandler[ReqT, RespT]
): ServerCall.Listener[ReqT] = new InterceptionListener(next.startCall(call, headers), call)
private class InterceptionListener[ReqT, RespT](next: ServerCall.Listener[ReqT], call: ServerCall[ReqT, RespT]) extends ServerCall.Listener[ReqT] {
override def onHalfClose(): Unit = next.onHalfClose()
override def onCancel(): Unit = next.onCancel()
override def onComplete(): Unit = next.onComplete()
override def onReady(): Unit = next.onReady()
override def onMessage(message: ReqT): Unit = {
message match {
case handshake: HandshakeRequest =>
val sslSession: Option[SSLSession] = Option(call.getAttributes.get(Grpc.TRANSPORT_ATTR_SSL_SESSION))
if (sslSession.isEmpty) {
logger.error("No SSL Session found in server call")
close()
} else {
sslSession.foreach { session =>
val pubKey = Base64.getEncoder.encodeToString(session.getPeerCertificates.head.getPublicKey.getEncoded)
if (pubKey == handshake.key) {
next.onMessage(message)
} else {
logger.error("Wrong public key")
close()
}
}
}
case _ => next.onMessage(message)
}
}
private def close(): Unit =
throw Status.UNAUTHENTICATED.withDescription("Wrong public key").asRuntimeException()
}
}
|