Unified Async/Websocket API

This page talks about the async and websocket extension for real-time application. Using the latest and greatest:

[http-kit "2.3.0"] ; Add to your project.clj

(:use org.httpkit.server) ; import namespace

The API is still working in progress, but is fully working. If interested, use GitHub issues page for suggestions, bug reports, or general discussions. Please refer server.clj more detail, it has pretty clear docstr.

This API is not compatible with `2.0.0-rc4`.

Motivation

Why a new API

  • Simpler and more consistent than previous one
  • New feature: HTTP streaming
  • Unify the API for HTTP (streaming or long-polling) and WebSocket: allow gracefully degrading to long-polling/streaming when WebSocket is not available in client
  • Sever also get notified when client close/away for long-polling/streaming

Channel Protocol

Unified asynchronous channel interface for HTTP (streaming or long-polling) and WebSocket.

Channel defines the following contract:
  • open?: Returns true iff channel is open.
  • close: Closes the channel. Idempotent: returns true if the channel was actually closed, or false if it was already closed.
  • websocket?: Returns true iff channel is a WebSocket.
  • send!: Sends data to client and returns true if the data was successfully sent, or false if the channel is closed. Data is sent directly to the client, NO RING MIDDLEWARE IS APPLIED. Data form: {:headers _ :status _ :body _} or just body. Note that :headers and :status will be stripped for WebSocket and for HTTP streaming responses after the first.
  • on-receive: Sets handler (fn [message-string]) for notification of client WebSocket messages. Message ordering is guaranteed by server.
  • on-close: Sets handler (fn [status]) for notification of channel being closed by the server or client. Handler will be invoked at most once. Useful for clean-up

Get the channel

A macro with-channel is provided, example:

(defn handler [req]
  (with-channel req channel ;; get the channel,
    ;; channel can be HTTP(long-polling, streaming) or Websocket
    ;; manipulate them with a unified API
    ))

Long polling example

long polling is very much like streaming

(defn handler [request]
  (with-channel request channel
    (on-some-event ;; wait for event
     (send! channel {:status 200
                     :headers {"Content-Type" "application/json; charset=utf-8"}
                     :body "hello, polling"}
            true))))  ; send! and close. just be explicit, true is the default for streaming/polling,

(run-server handler {:port 9090})

Streaming example

(defn handler [request]
  (with-channel request channel
    (on-close channel (fn [status] (println "closed by sever, or client")))
    (loop [id 0]
      (when (< id 10)
        (schedule-task (* id 200) ;; sent a message every 200ms
                       (send! channel (str "message from server #" id) false)) ; false => don't close after send
        (recur (inc id))))
    (schedule-task 10000 (close channel)))) ;; close in 10s.

;;; open you browser http://127.0.0.1:9090, a new message show up every 200ms
(run-server handler {:port 9090})

Websocket example

(defn handler [request]
  (with-channel request channel
    (on-close channel (fn [status] (println "client close it" status)))
    (on-receive channel (fn [data] ;; echo it back
                          (send! channel data)))))

(run-server handler {:port 9090})

A utility: ChannelStore

I find a utility like ChannelStore(or ChannelGroup) is quite helpful when using the API

(defprotocol ChannelStore
  (add-to-group [store group ch attach])
  (remove-from-group [store group ch])
  (broadcast [store group fn]))

;;; implementation using atom.
(deftype MemoryChannelStore [store-map]
  ChannelStore
  (add-to-group [_ group ch attach]
    (swap! store-map (fn [old-store]
                       (assoc old-store
                         group
                         (assoc (or (old-store group) {})
                           ch attach)))))
  (remove-from-group [_ group ch]
    (swap! store-map (fn [old-store]
                       (let [m (update-in old-store [group] dissoc ch)]
                         (if (empty? (m group))
                           (dissoc m group)
                           m)))))
  (broadcast [store group f]
    (doseq [[ch attach] (@store-map group)]
      (if (websocket? ch)
        ;; give back attachment (some state for pass back)
        (send! ch (json-str (f ch attach)))
        (send! ch {:status 200
                   :headers {"Content-Type"
                             "application/json; charset=utf-8"}
                   :body (json-str (f ch attach))}
               true)))))                ; close it

;;; example usage
(defonce channel-store (MemoryChannelStore. (atom {})))
(defn handler [req]
  (let [group (-> req :params :group)
        since_ts (-> req :params :since_ts)] ;; state variable, from client
    (with-channel req ch
      (add-to-group channel-store group ch since_ts)
      (on-close ch (fn [status]
                     (remove-from-group channel-store group ch))))))

;;; some logic in other place
(on-some-event
 (broadcast channel-store a-group
            (fn [ch since_ts] ; since_ts is passed back.
              (some-data-return-to-clients since_ts))))
  

If this is proven to be useful, http-kit can provide a built in more efficient implentation