WebSockets

This feature adds WebSockets support to Ktor.WebSockets are a mechanism to keep a bi-directional real-time ordered connection betweenthe server and the client.Each message from this channel is called Frame: a frame can be a text or binary message,or a close or ping/pong message. Frames can be marked as incomplete or final.

This feature is defined in the class io.ktor.websocket.WebSockets in the artifact io.ktor:ktor-websockets:$ktor_version.

dependencies { implementation "io.ktor:ktor-websockets:$ktor_version"}

dependencies { implementation("io.ktor:ktor-websockets:$ktor_version")}

<project> … <dependencies> <dependency> <groupId>io.ktor</groupId> <artifactId>ktor-websockets</artifactId> <version>${ktor.version}</version> <scope>compile</scope> </dependency> </dependencies></project>

Table of contents:

Installing

In order to use the WebSockets functionality you first have to install it:

  1. install(WebSockets)

You can adjust a few parameters when installing if required:

  1. install(WebSockets) {
  2. pingPeriod = Duration.ofSeconds(60) // Disabled (null) by default
  3. timeout = Duration.ofSeconds(15)
  4. maxFrameSize = Long.MAX_VALUE // Disabled (max value). The connection will be closed if surpassed this length.
  5. masking = false
  6. }

Usage

Once installed, you can define the webSocket routes for the routing feature:

Instead of the short-lived normal route handlers, webSocket handlers are meant to be long-lived.And all the relevant WebSocket methods are suspended so that the function will be suspended ina non-blocking way while receiving or sending messages.

webSocket methods receive a callback with a WebSocketSessioninstance as the receiver. That interface defines an incoming (ReceiveChannel) property and an outgoing (SendChannel)property, as well as a close method. Check the full WebSocketSession for more information.

Usage as an suspend actor

  1. routing {
  2. webSocket("/") { // websocketSession
  3. for (frame in incoming) {
  4. when (frame) {
  5. is Frame.Text -> {
  6. val text = frame.readText()
  7. outgoing.send(Frame.Text("YOU SAID: $text"))
  8. if (text.equals("bye", ignoreCase = true)) {
  9. close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
  10. }
  11. }
  12. }
  13. }
  14. }
  15. }

An exception will be thrown while receiving a Frame if the client closes the connectionexplicitly or the TCP socket is closed. So even with a while (true) loop, this shouldn’t bea leak.

Usage as a Channel

Since the incoming property is a ReceiveChannel, you can use it with its stream-like interface:

  1. routing {
  2. webSocket("/") { // websocketSession
  3. for (frame in incoming.mapNotNull { it as? Frame.Text }) {
  4. val text = frame.readText()
  5. outgoing.send(Frame.Text("YOU SAID $text"))
  6. if (text.equals("bye", ignoreCase = true)) {
  7. close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
  8. }
  9. }
  10. }
  11. }

Interface

The WebSocketSession interface

You receive a WebSocketSession as the receiver (this), giving you direct accessto these members inside your webSocket handler.

  1. interface WebSocketSession {
  2. // Basic interface
  3. val incoming: ReceiveChannel<Frame> // Incoming frames channel
  4. val outgoing: SendChannel<Frame> // Outgoing frames channel
  5. fun close(reason: CloseReason)
  6. // Convenience method equivalent to `outgoing.send(frame)`
  7. suspend fun send(frame: Frame) // Enqueue frame, may suspend if the outgoing queue is full. May throw an exception if the outgoing channel is already closed, so it is impossible to transfer any message.
  8. // The call and the context
  9. val call: ApplicationCall
  10. val application: Application
  11. // Modifiable properties for this request. Their initial value comes from the feature configuration.
  12. var pingInterval: Duration?
  13. var timeout: Duration
  14. var masking: Boolean // Enable or disable masking output messages by a random xor mask.
  15. var maxFrameSize: Long // Specifies frame size limit. The connection will be closed if violated
  16. // Advanced
  17. val closeReason: Deferred<CloseReason?>
  18. suspend fun flush() // Flush all outstanding messages and suspend until all earlier sent messages will be written. Could be called at any time even after close. May return immediately if connection is already terminated.
  19. fun terminate() // Initiate connection termination immediately. Termination may complete asynchronously.
  20. }

If you need information about the connection. For example the client ip, you have accessto the call property. So you can do things like call.request.origin.host insideyour websocket block.

The Frame interface

A frame is each packet sent and received at the WebSocket protocol level.There are two message types: TEXT and BINARY. And three control packets: CLOSE, PING, and PONG.Each packet has a payload buffer. And for Text or Close messages, you cancall the readText or readReason to interpret that buffer.

  1. enum class FrameType { TEXT, BINARY, CLOSE, PING, PONG }
  1. sealed class Frame {
  2. val fin: Boolean // Is this frame a final frame?
  3. val frameType: FrameType // The Type of the frame
  4. val buffer: ByteBuffer // Payload
  5. val disposableHandle: DisposableHandle
  6. class Binary : Frame
  7. class Text : Frame {
  8. fun readText(): String
  9. }
  10. class Close : Frame {
  11. fun readReason(): CloseReason?
  12. }
  13. class Ping : Frame
  14. class Pong : Frame
  15. }

Testing

You can test WebSocket conversations by using the handleWebSocketConversationmethod inside a withTestApplication block.

test.kt

  1. class MyAppTest {
  2. @Test
  3. fun testConversation() {
  4. withTestApplication {
  5. application.install(WebSockets)
  6. val received = arrayListOf<String>()
  7. application.routing {
  8. webSocket("/echo") {
  9. try {
  10. while (true) {
  11. val text = (incoming.receive() as Frame.Text).readText()
  12. received += text
  13. outgoing.send(Frame.Text(text))
  14. }
  15. } catch (e: ClosedReceiveChannelException) {
  16. // Do nothing!
  17. } catch (e: Throwable) {
  18. e.printStackTrace()
  19. }
  20. }
  21. }
  22. handleWebSocketConversation("/echo") { incoming, outgoing ->
  23. val textMessages = listOf("HELLO", "WORLD")
  24. for (msg in textMessages) {
  25. outgoing.send(Frame.Text(msg))
  26. assertEquals(msg, (incoming.receive() as Frame.Text).readText())
  27. }
  28. assertEquals(textMessages, received)
  29. }
  30. }
  31. }
  32. }

FAQ

Standard Events: onConnect, onMessage, onClose and onError

How do the standard events from the WebSocket API maps to Ktor?

  • onConnect happens at the start of the block.
  • onMessage happens after successfully reading a message (for example with incoming.receive()) or using suspended iteration with for(frame in incoming).
  • onClose happens when the incoming channel is closed. That would complete the suspended iteration, or throw a ClosedReceiveChannelException when trying to receive a message`.
  • onError is equivalent to other other exceptions.

In both onClose and onError, the closeReason property is set.

To illustrate this:

  1. webSocket("/echo") {
  2. println("onConnect")
  3. try {
  4. for (frame in incoming){
  5. val text = (frame as Frame.Text).readText()
  6. println("onMessage")
  7. received += text
  8. outgoing.send(Frame.Text(text))
  9. }
  10. } catch (e: ClosedReceiveChannelException) {
  11. println("onClose ${closeReason.await()}")
  12. } catch (e: Throwable) {
  13. println("onError ${closeReason.await()}")
  14. e.printStackTrace()
  15. }
  16. }

In this sample, the infinite loop is only exited with an exception is risen: either a ClosedReceiveChannelException or another exception.