Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Publishing arbitrary messages to a stream, with netty bugfix At this point we hit a netty-only bug in our decoder where we didn't properly handle what happens when netty doesn't offer enough bytes. So we were forced to learn how to use the ReplayingDecoder. Really only possible because we tested with keeping the client open. |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA3-256: |
c9198979db9436c059a43b7ecfe93e92 |
User & Date: | scstarkey 2025-02-12 13:52:42 |
Context
2025-02-12
| ||
14:01 | Fixed linting error from not running my own process check-in: 4e2f46ed71 user: scstarkey tags: trunk | |
13:52 | Publishing arbitrary messages to a stream, with netty bugfix At this point we hit a netty-only bug in our decoder where we didn't properly handle what happens when netty doesn't offer enough bytes. So we were forced to learn how to use the ReplayingDecoder. Really only possible because we tested with keeping the client open. check-in: c9198979db user: scstarkey tags: trunk | |
2025-02-11
| ||
11:52 | A bit cleaner check-in: b1bff87b2b user: scstarkey tags: trunk | |
Changes
Changes to src/streamful/client.clj.
︙ | ︙ | |||
197 198 199 200 201 202 203 | (send-cmd! {:c "reg" :m {:id id}} private-key))) (configure-stream [_ {:keys [id] :as params}] (let [{:keys [private-key]} @session] (send-cmd! {:c "cfg" :m {:id id, :params (dissoc params :id)}} private-key))) | | > > > | | 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | (send-cmd! {:c "reg" :m {:id id}} private-key))) (configure-stream [_ {:keys [id] :as params}] (let [{:keys [private-key]} @session] (send-cmd! {:c "cfg" :m {:id id, :params (dissoc params :id)}} private-key))) (submit-message [_ {:keys [id] :as params}] (let [{:keys [private-key]} @session] (send-cmd! {:c "put" :m {:id id, :params (dissoc params :id)}} private-key))) (close [_] (close!)) (is-open? [_] (is-open?))))) |
Changes to src/streamful/core.clj.
︙ | ︙ | |||
62 63 64 65 66 67 68 | (server/start-netty-server! @senv/port handler server-started server-stopped :timeout @senv/timeout :accept-timeout @senv/accept-timeout | | > | 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | (server/start-netty-server! @senv/port handler server-started server-stopped :timeout @senv/timeout :accept-timeout @senv/accept-timeout :file-buffer-bytes @senv/file-buffer-bytes :file-channel-chunk-size @senv/file-channel-chunk-size)) (defn- start-socket! [handler server-started server-stopped] (server/start-socket-server! @senv/port handler server-started server-stopped |
︙ | ︙ | |||
92 93 94 95 96 97 98 | (stream/register-stream! db-stream-model pk msg original-msg-bytes)) (configure-stream! [_, pk, msg, original-msg-bytes] (stream/configure-stream! db-stream-model pk msg original-msg-bytes)) (get-messages! [_, pk, {{:strs [id]} "m" :as msg}, original-msg-bytes] (if (= "COPYRIGHT" id) {:status :ok :messages [{"m" copyright-text}]} | | > > | 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 | (stream/register-stream! db-stream-model pk msg original-msg-bytes)) (configure-stream! [_, pk, msg, original-msg-bytes] (stream/configure-stream! db-stream-model pk msg original-msg-bytes)) (get-messages! [_, pk, {{:strs [id]} "m" :as msg}, original-msg-bytes] (if (= "COPYRIGHT" id) {:status :ok :messages [{"m" copyright-text}]} (stream/get-messages! db-stream-model pk msg original-msg-bytes))) (put-message! [_, pk, msg, original-msg-bytes] (stream/put-message! db-stream-model pk msg original-msg-bytes)))) (defn build-protocol-handler [db-loc session-timeout] (let [db-stream-model (sdl/build-stream-model db-loc) stream-model (wrap-stream-model db-stream-model)] (proto/protocol-handler {:stream-model stream-model :session-timeout (-> session-timeout |
︙ | ︙ |
Changes to src/streamful/env.clj.
︙ | ︙ | |||
79 80 81 82 83 84 85 86 87 88 89 90 91 92 | :tfn parse-long) (defenv file-buffer-bytes :doc "At what size do we cache bytes to a file? (netty-only, bytes)" :default (str 1048576) :masked? false :tfn parse-long) (defenv protocol :doc "Server protocol. Allowed modes: streamful, echo" :default "streamful" :masked? false :tfn keyword) | > > > > > > | 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | :tfn parse-long) (defenv file-buffer-bytes :doc "At what size do we cache bytes to a file? (netty-only, bytes)" :default (str 1048576) :masked? false :tfn parse-long) (defenv file-channel-chunk-size :doc "Bytes to read into memory at a time with file buffer (netty-only)" :default (str 1048576) :masked? false :tfn parse-long) (defenv protocol :doc "Server protocol. Allowed modes: streamful, echo" :default "streamful" :masked? false :tfn keyword) |
︙ | ︙ |
Changes to src/streamful/protocol.clj.
︙ | ︙ | |||
44 45 46 47 48 49 50 | (do (log/warn "Failed to register stream" result) "failed")))) (defn configure-stream! [pk stream-model msg original] (let [result (stream/configure-stream! stream-model pk msg original)] (case result :ok "ok" | | > > > > > > > > | 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | (do (log/warn "Failed to register stream" result) "failed")))) (defn configure-stream! [pk stream-model msg original] (let [result (stream/configure-stream! stream-model pk msg original)] (case result :ok "ok" :unauthorized "unauthorized" (do (log/warn "Failed to configure stream" result) "failed")))) (defn get-messages [pk stream-model cmd original] (let [{:keys [status messages] :as result} (stream/get-messages! stream-model pk cmd original)] (case status :ok {"status" "ok", "messages" messages} :stream-missing {"status" "stream not found"} (do (log/warn "Failed to get messages" result) "failed")))) (defn- put-message! [pk stream-model cmd original] (let [result (stream/put-message! stream-model pk cmd original)] (case result :ok "ok" :unauthorized "unauthorized" (do (log/warn "Failed to configure stream" result) "failed")))) (defn- handle-message ([signed? pk req msg] (try (let [original (.readAllBytes msg) msg (if signed? (cr/verified original pk) original)] (->> (cbor/decode msg) |
︙ | ︙ | |||
80 81 82 83 84 85 86 87 88 89 90 91 92 93 | (throw e))))) ([{:keys [stream-model pk original] {:keys [remote-ip session-id]} :req} {:strs [c m] :as cmd}] (case c "ping" m "get" (get-messages pk stream-model cmd original) "reg" (register-stream! pk stream-model cmd original) "cfg" (configure-stream! pk stream-model cmd original) (throw (ex-info "Unknown command" {:cmd cmd :session-id session-id :remote-ip remote-ip}))))) | > | 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 | (throw e))))) ([{:keys [stream-model pk original] {:keys [remote-ip session-id]} :req} {:strs [c m] :as cmd}] (case c "ping" m "get" (get-messages pk stream-model cmd original) "put" (put-message! pk stream-model cmd original) "reg" (register-stream! pk stream-model cmd original) "cfg" (configure-stream! pk stream-model cmd original) (throw (ex-info "Unknown command" {:cmd cmd :session-id session-id :remote-ip remote-ip}))))) |
︙ | ︙ |
Changes to src/streamful/server.clj.
︙ | ︙ | |||
19 20 21 22 23 24 25 | (ns streamful.server (:require [clojure.java.io :as io] [clojure.string :as str] [streamful.crypto :as cr] [streamful.io :as sio] [streamful.temp :as temp] [taoensso.timbre :as log]) | > > | | | 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | (ns streamful.server (:require [clojure.java.io :as io] [clojure.string :as str] [streamful.crypto :as cr] [streamful.io :as sio] [streamful.temp :as temp] [taoensso.timbre :as log]) (:import (io.netty.handler.codec ReplayingDecoder) (io.netty.util Signal) (java.io ByteArrayInputStream ByteArrayOutputStream File FileInputStream FileOutputStream InputStream OutputStream RandomAccessFile) (java.net ServerSocket Socket SocketTimeoutException) (java.nio ByteBuffer ByteOrder CharBuffer) (java.nio.channels FileChannel FileChannel$MapMode) (java.util Scanner) (java.util.concurrent Executors ThreadFactory))) (log/set-ns-min-level! #"io.netty.*" :info) (import (io.netty.bootstrap ServerBootstrap) (io.netty.buffer ByteBuf ByteBufAllocator ByteBufUtil CompositeByteBuf Unpooled) (io.netty.channel ChannelHandlerContext ChannelInitializer ChannelOption SimpleChannelInboundHandler) (io.netty.channel.nio NioEventLoopGroup) (io.netty.channel.socket.nio NioServerSocketChannel) (io.netty.handler.codec ByteToMessageDecoder MessageToMessageEncoder) (io.netty.handler.timeout ReadTimeoutHandler)) ;;;;;;;;;;;;;; ;;; COMMON ;;; ;;;;;;;;;;;;;; |
︙ | ︙ | |||
255 256 257 258 259 260 261 | (fn [] (log/info "Stopping server") (reset! running false)))) ;;;;;;;;;;;;;;;;;;;; ;;; NETTY SERVER ;;; ;;;;;;;;;;;;;;;;;;;; | < | | | | | | < < < < | > > | < < | | > > > > | < > | | | | > | | > > > | > | > | 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 | (fn [] (log/info "Stopping server") (reset! running false)))) ;;;;;;;;;;;;;;;;;;;; ;;; NETTY SERVER ;;; ;;;;;;;;;;;;;;;;;;;; (defn- read-line-from-buf! [^ByteBuf msg] (let [out (ByteArrayOutputStream.)] (loop [] (let [b (.readByte msg)] (if (= 10 b) (.toString out cr/encoding) (do (.write out b) (recur))))))) (defn- add-to-composite! [^CompositeByteBuf composite, ^ByteBuf buf] (.addComponent composite buf) (let [prev-idx (.writerIndex composite) addition (.readableBytes buf)] (.writerIndex composite (+ prev-idx addition)))) (defn- buf->temp-file-stream [^long file-channel-chunk-size ^long bytes-to-read, ^ByteBuf msg] (let [^File f (:temp-file (temp/temp-file! ".bin")) ^bytes buf (byte-array file-channel-chunk-size)] (with-open [fo (RandomAccessFile. f "rw") ^FileChannel fc (.getChannel fo)] (loop [bytes-left bytes-to-read] (when (pos? bytes-left) (let [this-chunk-size (min bytes-left file-channel-chunk-size)] (.readBytes msg buf 0 this-chunk-size) (.write fc (ByteBuffer/wrap buf 0 this-chunk-size)) (recur (- bytes-left this-chunk-size)))))) (FileInputStream. f))) (defn- buf->byte-stream [^long bytes-to-read, ^ByteBuf msg] (let [^bytes arr (byte-array bytes-to-read)] (.readBytes msg arr 0 bytes-to-read) (ByteArrayInputStream. arr))) (defn- decoder [{:keys [session-id current-mode bytes-to-read file-buffer-bytes file-channel-chunk-size]}] (proxy [ReplayingDecoder] [] (decode [^ChannelHandlerContext ctx, ^ByteBuf msg, out] (try (log/trace session-id "decode" @current-mode @bytes-to-read) (let [read-result (case @current-mode :cmd (read-line-from-buf! msg) :bytes (if (> @bytes-to-read file-buffer-bytes) (buf->temp-file-stream file-channel-chunk-size @bytes-to-read msg) (buf->byte-stream @bytes-to-read msg)))] (when read-result (log/trace session-id "decoded" (loggable-msg read-result)) (proxy-super checkpoint) (.add out read-result))) (catch Signal s (throw s)) (catch Throwable t (log/error t) (.close ctx)))))) (defn- stream->temp->buf! [^long num-bytes, ^InputStream in] (let [^File temp-file (:temp-file (temp/temp-file! ".bin"))] (with-open [file-out (FileOutputStream. temp-file)] |
︙ | ︙ | |||
410 411 412 413 414 415 416 | @msg-count new-mode response) (when (instance? InputStream msg) (.close msg)) (if out (.writeAndFlush ctx out) (.close ctx)))) (exceptionCaught [_ctx t] (log/error t))))) | | > | > > > > > | > | > > | | | | | > | | | | 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 | @msg-count new-mode response) (when (instance? InputStream msg) (.close msg)) (if out (.writeAndFlush ctx out) (.close ctx)))) (exceptionCaught [_ctx t] (log/error t))))) (defn- channel-initializer [handler timeout file-buffer-bytes file-channel-chunk-size] (proxy [ChannelInitializer] [] (initChannel [socketChannel] (let [session-id (cr/guid) remote-ip (.remoteAddress socketChannel) current-mode (ref :cmd) bytes-to-read (ref nil) msg-count (ref 0) props {:session-id session-id :remote-ip remote-ip :current-mode current-mode :bytes-to-read bytes-to-read :handler handler :timeout timeout :msg-count msg-count :file-buffer-bytes file-buffer-bytes :file-channel-chunk-size file-channel-chunk-size}] (log/trace session-id "initChannel") (-> socketChannel .pipeline (.addLast "readTimeoutHandler" (ReadTimeoutHandler. (int timeout))) (.addLast "encoder" (encoder props)) (.addLast "decoder" (decoder props)) (.addLast "mainHandler" (channel-handler props))))))) (defn start-netty-server! "The `handler` that you pass must follow the contract specified in `handle-client-msg!` `timeout` is the time to wait for bytes from a client `accept-timeout` is the time to wait for a socket connection `file-buffer-bytes` is the size an incoming message must be to stream to a temp file `file-channel-chunk-size` is the size of a single chunk to read into memory before writing to the aforementioned file buffer See the `streamful.server.server-test` for more." [^Integer port, handler, on-start, on-exit, & {:keys [^Integer timeout ^Integer accept-timeout, ^Long file-buffer-bytes ^Long file-channel-chunk-size] :or {timeout 60000 accept-timeout 5000 file-buffer-bytes 1024 file-channel-chunk-size 1024}}] (let [server-group (NioEventLoopGroup.) client-group (NioEventLoopGroup.) server (-> (ServerBootstrap.) (.group server-group client-group) (.channel NioServerSocketChannel) (.childHandler (channel-initializer handler timeout file-buffer-bytes file-channel-chunk-size)) (.childOption ChannelOption/SO_KEEPALIVE true) (.option ChannelOption/CONNECT_TIMEOUT_MILLIS (int accept-timeout))) f (-> server (.bind port) .sync)] (when (.isSuccess f) (on-start)) (fn [] (log/info "Stopping server") (try (.shutdownGracefully server-group) (.shutdownGracefully client-group) (-> f .channel .closeFuture .sync) (finally (on-exit)))))) |
Changes to src/streamful/stream.clj.
1 2 3 4 5 6 7 | (ns streamful.stream) (defprotocol StreamModel (register-stream! [_, ^String pk, msg, ^bytes original-msg-bytes]) (configure-stream! [_, ^String pk, msg, ^bytes original-msg-bytes]) (get-messages! [_, ^String pk, msg, ^bytes original-msg-bytes] "Get messages from a stream. May be part of a signed request. | | | > > | 1 2 3 4 5 6 7 8 9 10 11 | (ns streamful.stream) (defprotocol StreamModel (register-stream! [_, ^String pk, msg, ^bytes original-msg-bytes]) (configure-stream! [_, ^String pk, msg, ^bytes original-msg-bytes]) (get-messages! [_, ^String pk, msg, ^bytes original-msg-bytes] "Get messages from a stream. May be part of a signed request. May cause state to change in the backend as we keep track of who has subscribed to our stream.") (put-message! [_, ^String pk, msg, ^bytes original-msg-bytes] "Put a message into a stream. Must be part of a signed request.")) |
Changes to src/streamful/stream_datalevin.clj.
︙ | ︙ | |||
8 9 10 11 12 13 14 | (def ^:private stream-agg-table "stream-aggregates") (def ^:private stream-data-table "stream-data") (defn- get-db-manifest [db] (db/get-value db stream-agg-table :manifest)) (defn- streamk [iid t] (str iid "|" t)) | | | | > > > | | | | | | | > > > > > > > > > > > > > > > > > > > > | > > | 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | (def ^:private stream-agg-table "stream-aggregates") (def ^:private stream-data-table "stream-data") (defn- get-db-manifest [db] (db/get-value db stream-agg-table :manifest)) (defn- streamk [iid t] (str iid "|" t)) (defn- stream-msg [iid msg client-psk original-msg-bytes] (let [t (System/currentTimeMillis)] [:put stream-data-table (streamk iid t) {"m" msg, "t" t, "psk" client-psk, "o" original-msg-bytes} :string :data])) ;; todo allow substreams of substreams ;; for now all substreams are implicitly of the root stream (defn- register-stream! [db client-psk {{:strs [id]} "m" :as msg} original-msg-bytes] (try (let [manifest (get-db-manifest db) {{:strs [substreams-allowed?]} "c" :as root-stream} (and manifest (manifest "root")) existing-stream (and manifest (manifest id)) is-root? (= id "root") root-exists? (not (nil? root-stream))] (cond (or (and substreams-allowed? (not existing-stream)) (and is-root? (not root-exists?))) (let [internal-id (cr/guid) smsg (stream-msg internal-id msg client-psk original-msg-bytes) first-msg-time (-> smsg (nth 3) :t)] (db/transact-kv db [[:put stream-agg-table :manifest (merge manifest {id {"psk" client-psk "iid" internal-id "f" first-msg-time}})] smsg]) :ok) (not root-exists?) :root-stream-missing existing-stream :already-exists :else :substreams-not-allowed)) (catch Throwable t (log/error t) :database-error))) (defn- configure-stream! [db client-psk {{:strs [id params]} "m" :as msg} original-msg-bytes] (try (let [manifest (get-db-manifest db) {:strs [iid cfg] :as existing-stream} (and manifest (manifest id))] (cond (and existing-stream (creq/eq? client-psk (existing-stream "psk"))) (do (db/transact-kv db [[:put stream-agg-table :manifest (merge manifest {id (assoc existing-stream "c" (merge cfg params))})] (stream-msg iid msg client-psk original-msg-bytes)]) :ok) (not existing-stream) :stream-missing :else :unauthorized)) (catch Throwable t (log/error t) :database-error))) (defn get-messages! [db client-psk {{:strs [id]} "m" :as msg} _original-msg-bytes] (let [manifest (get-db-manifest db) {:strs [iid f psk]} (and manifest (manifest id)) fk (streamk iid f) lk (streamk iid (System/currentTimeMillis)) r [:closed fk lk]] (if (creq/eq? psk client-psk) {:status :ok :messages (db/get-first-n db stream-data-table 50 r :string :data true)} {:status :stream-missing}))) (defn put-message! [db client-psk {{:strs [id ]} "m" :as msg} original-msg-bytes] (let [manifest (get-db-manifest db) {:strs [iid] :as existing-stream} (and manifest (manifest id))] (cond (and existing-stream (creq/eq? client-psk (existing-stream "psk"))) (do (db/transact-kv db [(stream-msg iid msg client-psk original-msg-bytes)]) :ok) (not existing-stream) :stream-missing :else :unauthorized))) (defn build-stream-model [location] (let [db (db/open-kv location)] (db/open-dbi db stream-agg-table) (db/open-dbi db stream-data-table) (reify stream/StreamModel (register-stream! [_, pk, msg, original-msg-bytes] (register-stream! db pk msg original-msg-bytes)) (configure-stream! [_, pk, msg, original-msg-bytes] (configure-stream! db pk msg original-msg-bytes)) (get-messages! [_, pk, msg, original-msg-bytes] (get-messages! db pk msg original-msg-bytes)) (put-message! [_, pk, msg, original-msg-bytes] (put-message! db pk msg original-msg-bytes))))) |
Changes to test/streamful/protocol_test.clj.
︙ | ︙ | |||
21 22 23 24 25 26 27 | [clojure.test :refer :all] [streamful.client :as client] [streamful.crypto :as cr] [streamful.protocol :refer :all] [streamful.test-asserts :as ta] [streamful.test-cfg :as tcfg])) | | | | > > > | 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | [clojure.test :refer :all] [streamful.client :as client] [streamful.crypto :as cr] [streamful.protocol :refer :all] [streamful.test-asserts :as ta] [streamful.test-cfg :as tcfg])) (defn- reassemble-msg [{:strs [psk o]}] (-> o (cr/verified psk) cbor/decode)) (defn- as [client keys] (is (= {:result "ok"} (client/as-stream client keys)))) (defn- get-as [client keys id] (as client keys) (client/get-messages client {:id id})) (defn- get-m [m] (get m "m")) (tcfg/def-server-test protocol-test [client] (testing "ping" (let [msg-to-echo (cr/guid94)] |
︙ | ︙ | |||
64 65 66 67 68 69 70 | {:public-key public-signing-key :private-key private-signing-key2}))) (is (= {:response "unauthorized"} (client/register-stream client {:id "test1"})))) (testing "first stream must be called 'root'" | | | | > > | | | | < < < < < < < | | | | | | | | | | | > > > > > > | | | | | | | | | | | | | | | | | > > > > | | > > > > > > | | > | | > > > > > > > > > > > > > | 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 | {:public-key public-signing-key :private-key private-signing-key2}))) (is (= {:response "unauthorized"} (client/register-stream client {:id "test1"})))) (testing "first stream must be called 'root'" (as client keys) (is (= {:response "root not yet configured -- register that first"} (client/register-stream client {:id "test1"}))) (is (= {:response "ok"} (client/register-stream client {:id "root"})))) (testing "can't register multiple streams with the same name" (is (= {:response "already exists"} (client/register-stream client {:id "root"})))) (testing "can't register more than one stream" (is (= {:response "substreams aren't allowed here"} (client/register-stream client {:id "test1"})))) (testing "only stream owner can configure it" (as client keys2) (is (= {:response "unauthorized"} (client/configure-stream client {:id "root" :substreams-allowed? true})))) (ta/pending "unsigned configuration messages don't get to the model") (testing "if root allows substreams" (as client keys) (is (= {:response "ok"} (client/configure-stream client {:id "root" :substreams-allowed? true}))) (testing "a substream can be registered" (as client keys2) (is (= {:response "ok"} (client/register-stream client {:id "test1"}))) (testing "but not twice" (as client keys) (is (= {:response "already exists"} (client/register-stream client {:id "test1"})))) (testing "and not after reconfigured off" (is (= {:response "ok"} (client/configure-stream client {:id "root" :substreams-allowed? false}))) (is (= {:response "substreams aren't allowed here"} (client/register-stream client {:id "test2"})))) (testing "but not twice, even after reconfiguring" (is (= {:response "ok"} (client/configure-stream client {:id "root" :substreams-allowed? true}))) (as client keys) (is (= {:response "already exists"} (client/register-stream client {:id "test1"})))))) (let [expected-root-messages [{"c" "reg" "m" {"id" "root"}} {"c" "cfg" "m" {"id" "root", "params" {"substreams-allowed?" true}}} {"c" "cfg" "m" {"id" "root", "params" {"substreams-allowed?" false}}} {"c" "cfg" "m" {"id" "root", "params" {"substreams-allowed?" true}}}] expected-test1-messages [{"c" "reg", "m" {"id" "test1"}}]] (testing "retrieving stream messages" (let [{{root-messages "messages" :as r1} :response} (get-as client keys "root") {{test1-messages "messages" :as r2} :response} (get-as client keys2 "test1")] (testing "messages include configuration stuff" (is (= expected-root-messages (map get-m root-messages)) r1) (is (= expected-test1-messages (map get-m test1-messages)) r2)) (testing "we can validate and reassemble from original binaries" (is (= expected-root-messages (map reassemble-msg root-messages))) (is (= expected-test1-messages (map reassemble-msg test1-messages)))))) (testing "good error message when stream is missing" (is (= {:response {"status" "stream not found"}} (get-as client keys "missing-stream")))) (testing "streams are private by default, show as missing" (is (= {:response {"status" "stream not found"}} (get-as client keys2 "root")))) (testing "publish simple messages" (as client keys) (is (= {:response "ok"} (client/submit-message client {:id "root" :t "hello, world!"}))) (testing "can't publish to someone else's stream" (as client keys2) (is (= {:response "unauthorized"} (client/submit-message client {:id "root" :t "hello, friend!"}))) (is (= {:response "ok"} (client/submit-message client {:id "test1" :t "hello, friend!"})))) (testing "messages show up" (let [{{root-messages "messages"} :response} (get-as client keys "root") {{test1-messages "messages"} :response} (get-as client keys2 "test1")] (is (= (conj expected-root-messages {"c" "put" "m" {"id" "root", "params" {"t" "hello, world!"}}}) (map get-m root-messages))) (is (= (conj expected-test1-messages {"c" "put" "m" {"id" "test1", "params" {"t" "hello, friend!"}}}) (map get-m test1-messages))))) (ta/pending "unsigned put messages don't get to the model") (ta/pending "messages have server receipt timestamps")))) (ta/pending "stream access"))) |
Changes to test/streamful/server_test.clj.
︙ | ︙ | |||
21 22 23 24 25 26 27 28 29 30 31 32 33 34 | [defenv.core :refer [defenv] :as env] [streamful.client :as client] [streamful.crypto :as cr] [streamful.env :as senv] [streamful.io :as sio] [streamful.log-util :as lu] [streamful.server :refer :all] [streamful.test-net :as tnet] [streamful.test-protocol :as tp] [streamful.test-thread :as tthread] [taoensso.timbre :as log]) (:import (java.io ByteArrayInputStream InputStream OutputStream) (java.net Socket))) | > | 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | [defenv.core :refer [defenv] :as env] [streamful.client :as client] [streamful.crypto :as cr] [streamful.env :as senv] [streamful.io :as sio] [streamful.log-util :as lu] [streamful.server :refer :all] [streamful.test-asserts :as ta] [streamful.test-net :as tnet] [streamful.test-protocol :as tp] [streamful.test-thread :as tthread] [taoensso.timbre :as log]) (:import (java.io ByteArrayInputStream InputStream OutputStream) (java.net Socket))) |
︙ | ︙ | |||
220 221 222 223 224 225 226 | test-name (- (System/currentTimeMillis) start-time)))))) (deftest server-test (log/set-ns-min-level! #"io.netty" :info) (lu/test-log-level! (log/info (with-out-str (env/display-env)))) (run-server-test "socket server" start-socket-server!) | | > | 221 222 223 224 225 226 227 228 229 | test-name (- (System/currentTimeMillis) start-time)))))) (deftest server-test (log/set-ns-min-level! #"io.netty" :info) (lu/test-log-level! (log/info (with-out-str (env/display-env)))) (run-server-test "socket server" start-socket-server!) (run-server-test "netty server" start-netty-server!) (ta/pending "netty uses file buffers properly")) |
Changes to test/streamful/test_cfg.clj.
1 2 3 4 5 6 7 8 9 | (ns streamful.test-cfg (:require [clojure.test :refer :all] [streamful.core :as core] [streamful.log-util :as lu] [streamful.temp :as temp] [streamful.test-protocol :as tp]) (:import (java.io File))) (defmacro def-logcfg-test [n & body] | > | > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | (ns streamful.test-cfg (:require [clojure.test :refer :all] [streamful.core :as core] [streamful.log-util :as lu] [streamful.temp :as temp] [streamful.test-protocol :as tp]) (:import (java.io File))) (defmacro def-logcfg-test [n & body] `(deftest ~n (lu/test-log-level! ~@body))) (defn- build-temp-protocol-handler [] (let [^File temp-dir (temp/session-tempdir!) ^File db-dir (File. temp-dir "db")] (core/build-protocol-handler (.getAbsolutePath db-dir) 1))) (defmacro def-server-test [n b & body] `(def-logcfg-test ~n (do (tp/with-server-and-client (~build-temp-protocol-handler) [~(first b)] ~@body) (temp/cleanup-all-tempdirs!)))) |
Changes to test/streamful/test_protocol.clj.
1 2 3 4 5 6 7 8 | (ns streamful.test-protocol (:require [clojure.test :refer :all] [streamful.client :as client] [streamful.server :as server] [streamful.test-net :as tnet] [streamful.test-thread :as tthread])) (defn start-server-and-wait [start-fn! port handler & | | > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | (ns streamful.test-protocol (:require [clojure.test :refer :all] [streamful.client :as client] [streamful.server :as server] [streamful.test-net :as tnet] [streamful.test-thread :as tthread])) (defn start-server-and-wait [start-fn! port handler & {:keys [on-start on-exit num-threads] :or {on-exit (constantly :default) on-start (constantly :default) num-threads 1}}] (let [server-running (atom false) on-start (fn [] (on-start) |
︙ | ︙ |