This page talks about the async and websocket extension for real-time application. Using the latest and greatest:
[http-kit "2.3.0"] ; Add to your project.clj
(:use org.httpkit.server) ; import namespace
The API is still working in progress, but is fully working. If interested, use GitHub issues page for suggestions, bug reports, or general discussions. Please refer server.clj more detail, it has pretty clear docstr.
This API is not compatible with `2.0.0-rc4`.
Why a new API
Unified asynchronous channel interface for HTTP (streaming or long-polling) and WebSocket.
Channel defines the following contract:open?
: Returns true iff channel is open.
close
: Closes the channel. Idempotent: returns true if the channel was actually closed, or false if it was already closed.
websocket?
: Returns true iff channel is a WebSocket.
send!
:
Sends data to client and returns true if the data was successfully sent,
or false if the channel is closed. Data is sent directly to the client,
NO RING MIDDLEWARE IS APPLIED.
Data form: {:headers _ :status _ :body _} or just body. Note that :headers
and :status will be stripped for WebSocket and for HTTP streaming responses
after the first.
on-receive
:
Sets handler (fn [message-string]) for notification of client WebSocket
messages. Message ordering is guaranteed by server.
on-close
:
Sets handler (fn [status]) for notification of channel being closed by the
server or client. Handler will be invoked at most once. Useful for clean-up
A macro with-channel
is provided, example:
(defn handler [req]
(with-channel req channel ;; get the channel,
;; channel can be HTTP(long-polling, streaming) or Websocket
;; manipulate them with a unified API
))
long polling is very much like streaming
(defn handler [request]
(with-channel request channel
(on-some-event ;; wait for event
(send! channel {:status 200
:headers {"Content-Type" "application/json; charset=utf-8"}
:body "hello, polling"}
true)))) ; send! and close. just be explicit, true is the default for streaming/polling,
(run-server handler {:port 9090})
(defn handler [request]
(with-channel request channel
(on-close channel (fn [status] (println "closed by sever, or client")))
(loop [id 0]
(when (< id 10)
(schedule-task (* id 200) ;; sent a message every 200ms
(send! channel (str "message from server #" id) false)) ; false => don't close after send
(recur (inc id))))
(schedule-task 10000 (close channel)))) ;; close in 10s.
;;; open you browser http://127.0.0.1:9090, a new message show up every 200ms
(run-server handler {:port 9090})
(defn handler [request]
(with-channel request channel
(on-close channel (fn [status] (println "client close it" status)))
(on-receive channel (fn [data] ;; echo it back
(send! channel data)))))
(run-server handler {:port 9090})
I find a utility like ChannelStore(or ChannelGroup) is quite helpful when using the API
(defprotocol ChannelStore
(add-to-group [store group ch attach])
(remove-from-group [store group ch])
(broadcast [store group fn]))
;;; implementation using atom.
(deftype MemoryChannelStore [store-map]
ChannelStore
(add-to-group [_ group ch attach]
(swap! store-map (fn [old-store]
(assoc old-store
group
(assoc (or (old-store group) {})
ch attach)))))
(remove-from-group [_ group ch]
(swap! store-map (fn [old-store]
(let [m (update-in old-store [group] dissoc ch)]
(if (empty? (m group))
(dissoc m group)
m)))))
(broadcast [store group f]
(doseq [[ch attach] (@store-map group)]
(if (websocket? ch)
;; give back attachment (some state for pass back)
(send! ch (json-str (f ch attach)))
(send! ch {:status 200
:headers {"Content-Type"
"application/json; charset=utf-8"}
:body (json-str (f ch attach))}
true))))) ; close it
;;; example usage
(defonce channel-store (MemoryChannelStore. (atom {})))
(defn handler [req]
(let [group (-> req :params :group)
since_ts (-> req :params :since_ts)] ;; state variable, from client
(with-channel req ch
(add-to-group channel-store group ch since_ts)
(on-close ch (fn [status]
(remove-from-group channel-store group ch))))))
;;; some logic in other place
(on-some-event
(broadcast channel-store a-group
(fn [ch since_ts] ; since_ts is passed back.
(some-data-return-to-clients since_ts))))
If this is proven to be useful, http-kit can provide a built in more efficient implentation