Streamful

Check-in [c9198979db]
Login

Check-in [c9198979db]

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Publishing arbitrary messages to a stream, with netty bugfix At this point we hit a netty-only bug in our decoder where we didn't properly handle what happens when netty doesn't offer enough bytes. So we were forced to learn how to use the ReplayingDecoder. Really only possible because we tested with keeping the client open.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: c9198979db9436c059a43b7ecfe93e9299cbfa7983b4ee17b8f645db100c76bc
User & Date: scstarkey 2025-02-12 13:52:42
Context
2025-02-12
14:01
Fixed linting error from not running my own process check-in: 4e2f46ed71 user: scstarkey tags: trunk
13:52
Publishing arbitrary messages to a stream, with netty bugfix At this point we hit a netty-only bug in our decoder where we didn't properly handle what happens when netty doesn't offer enough bytes. So we were forced to learn how to use the ReplayingDecoder. Really only possible because we tested with keeping the client open. check-in: c9198979db user: scstarkey tags: trunk
2025-02-11
11:52
A bit cleaner check-in: b1bff87b2b user: scstarkey tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/streamful/client.clj.

197
198
199
200
201
202
203
204



205
206
207
208
209
          (send-cmd! {:c "reg" :m {:id id}} private-key)))

      (configure-stream [_ {:keys [id] :as params}]
        (let [{:keys [private-key]} @session]
          (send-cmd! {:c "cfg" :m {:id id, :params (dissoc params :id)}}
                     private-key)))

      (submit-message [_ {}]



        )

      (close [_] (close!))

      (is-open? [_] (is-open?)))))







|
>
>
>
|




197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
          (send-cmd! {:c "reg" :m {:id id}} private-key)))

      (configure-stream [_ {:keys [id] :as params}]
        (let [{:keys [private-key]} @session]
          (send-cmd! {:c "cfg" :m {:id id, :params (dissoc params :id)}}
                     private-key)))

      (submit-message [_ {:keys [id] :as params}]
        (let [{:keys [private-key]} @session]
          (send-cmd! {:c "put" :m {:id id,
                                   :params (dissoc params :id)}}
                     private-key)))

      (close [_] (close!))

      (is-open? [_] (is-open?)))))

Changes to src/streamful/core.clj.

62
63
64
65
66
67
68
69

70
71
72
73
74
75
76
  (server/start-netty-server!
    @senv/port
    handler
    server-started
    server-stopped
    :timeout @senv/timeout
    :accept-timeout @senv/accept-timeout
    :file-buffer-bytes @senv/file-buffer-bytes))


(defn- start-socket! [handler server-started server-stopped]
  (server/start-socket-server!
    @senv/port
    handler
    server-started
    server-stopped







|
>







62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
  (server/start-netty-server!
    @senv/port
    handler
    server-started
    server-stopped
    :timeout @senv/timeout
    :accept-timeout @senv/accept-timeout
    :file-buffer-bytes @senv/file-buffer-bytes
    :file-channel-chunk-size @senv/file-channel-chunk-size))

(defn- start-socket! [handler server-started server-stopped]
  (server/start-socket-server!
    @senv/port
    handler
    server-started
    server-stopped
92
93
94
95
96
97
98
99


100
101
102
103
104
105
106
      (stream/register-stream! db-stream-model pk msg original-msg-bytes))
    (configure-stream! [_, pk, msg, original-msg-bytes]
      (stream/configure-stream! db-stream-model pk msg original-msg-bytes))
    (get-messages! [_, pk, {{:strs [id]} "m" :as msg}, original-msg-bytes]
      (if (= "COPYRIGHT" id)
        {:status :ok
         :messages [{"m" copyright-text}]}
        (stream/get-messages! db-stream-model pk msg original-msg-bytes)))))



(defn build-protocol-handler [db-loc session-timeout]
  (let [db-stream-model (sdl/build-stream-model db-loc)
        stream-model (wrap-stream-model db-stream-model)]
    (proto/protocol-handler
      {:stream-model stream-model
       :session-timeout (-> session-timeout







|
>
>







93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
      (stream/register-stream! db-stream-model pk msg original-msg-bytes))
    (configure-stream! [_, pk, msg, original-msg-bytes]
      (stream/configure-stream! db-stream-model pk msg original-msg-bytes))
    (get-messages! [_, pk, {{:strs [id]} "m" :as msg}, original-msg-bytes]
      (if (= "COPYRIGHT" id)
        {:status :ok
         :messages [{"m" copyright-text}]}
        (stream/get-messages! db-stream-model pk msg original-msg-bytes)))
    (put-message! [_, pk, msg, original-msg-bytes]
      (stream/put-message! db-stream-model pk msg original-msg-bytes))))

(defn build-protocol-handler [db-loc session-timeout]
  (let [db-stream-model (sdl/build-stream-model db-loc)
        stream-model (wrap-stream-model db-stream-model)]
    (proto/protocol-handler
      {:stream-model stream-model
       :session-timeout (-> session-timeout

Changes to src/streamful/env.clj.

79
80
81
82
83
84
85






86
87
88
89
90
91
92
        :tfn parse-long)

(defenv file-buffer-bytes
        :doc "At what size do we cache bytes to a file? (netty-only, bytes)"
        :default (str 1048576)
        :masked? false
        :tfn parse-long)







(defenv protocol
        :doc "Server protocol. Allowed modes: streamful, echo"
        :default "streamful"
        :masked? false
        :tfn keyword)








>
>
>
>
>
>







79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
        :tfn parse-long)

(defenv file-buffer-bytes
        :doc "At what size do we cache bytes to a file? (netty-only, bytes)"
        :default (str 1048576)
        :masked? false
        :tfn parse-long)

(defenv file-channel-chunk-size
        :doc "Bytes to read into memory at a time with file buffer (netty-only)"
        :default (str 1048576)
        :masked? false
        :tfn parse-long)

(defenv protocol
        :doc "Server protocol. Allowed modes: streamful, echo"
        :default "streamful"
        :masked? false
        :tfn keyword)

Changes to src/streamful/protocol.clj.

44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62








63
64
65
66
67
68
69
      (do (log/warn "Failed to register stream" result)
          "failed"))))

(defn configure-stream! [pk stream-model msg original]
  (let [result (stream/configure-stream! stream-model pk msg original)]
    (case result
      :ok "ok"
      :access-denied "access denied"
      (do (log/warn "Failed to configure stream" result)
          "failed"))))

(defn get-messages [pk stream-model cmd original]
  (let [{:keys [status messages] :as result}
        (stream/get-messages! stream-model pk cmd original)]
    (case status
      :ok {"status" "ok", "messages" messages}
      :stream-missing {"status" "stream not found"}
      (do (log/warn "Failed to get messages" result)
          "failed"))))









(defn- handle-message
  ([signed? pk req msg]
   (try
     (let [original (.readAllBytes msg)
           msg (if signed? (cr/verified original pk) original)]
       (->> (cbor/decode msg)







|











>
>
>
>
>
>
>
>







44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
      (do (log/warn "Failed to register stream" result)
          "failed"))))

(defn configure-stream! [pk stream-model msg original]
  (let [result (stream/configure-stream! stream-model pk msg original)]
    (case result
      :ok "ok"
      :unauthorized "unauthorized"
      (do (log/warn "Failed to configure stream" result)
          "failed"))))

(defn get-messages [pk stream-model cmd original]
  (let [{:keys [status messages] :as result}
        (stream/get-messages! stream-model pk cmd original)]
    (case status
      :ok {"status" "ok", "messages" messages}
      :stream-missing {"status" "stream not found"}
      (do (log/warn "Failed to get messages" result)
          "failed"))))

(defn- put-message! [pk stream-model cmd original]
  (let [result (stream/put-message! stream-model pk cmd original)]
    (case result
      :ok "ok"
      :unauthorized "unauthorized"
      (do (log/warn "Failed to configure stream" result)
          "failed"))))

(defn- handle-message
  ([signed? pk req msg]
   (try
     (let [original (.readAllBytes msg)
           msg (if signed? (cr/verified original pk) original)]
       (->> (cbor/decode msg)
80
81
82
83
84
85
86

87
88
89
90
91
92
93
         (throw e)))))

  ([{:keys [stream-model pk original] {:keys [remote-ip session-id]} :req}
    {:strs [c m] :as cmd}]
   (case c
     "ping" m
     "get" (get-messages pk stream-model cmd original)

     "reg" (register-stream! pk stream-model cmd original)
     "cfg" (configure-stream! pk stream-model cmd original)
     (throw (ex-info "Unknown command"
                     {:cmd cmd
                      :session-id session-id
                      :remote-ip remote-ip})))))








>







88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
         (throw e)))))

  ([{:keys [stream-model pk original] {:keys [remote-ip session-id]} :req}
    {:strs [c m] :as cmd}]
   (case c
     "ping" m
     "get" (get-messages pk stream-model cmd original)
     "put" (put-message! pk stream-model cmd original)
     "reg" (register-stream! pk stream-model cmd original)
     "cfg" (configure-stream! pk stream-model cmd original)
     (throw (ex-info "Unknown command"
                     {:cmd cmd
                      :session-id session-id
                      :remote-ip remote-ip})))))

Changes to src/streamful/server.clj.

19
20
21
22
23
24
25


26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
(ns streamful.server
  (:require [clojure.java.io :as io]
            [clojure.string :as str]
            [streamful.crypto :as cr]
            [streamful.io :as sio]
            [streamful.temp :as temp]
            [taoensso.timbre :as log])


  (:import (java.io ByteArrayInputStream File FileInputStream FileOutputStream
                    InputStream OutputStream RandomAccessFile)
           (java.net ServerSocket Socket SocketTimeoutException)
           (java.nio ByteBuffer ByteOrder CharBuffer)
           (java.nio.channels FileChannel FileChannel$MapMode)
           (java.util Scanner)
           (java.util.concurrent Executors ThreadFactory)))

(log/set-ns-min-level! #"io.netty.*" :info)

(import (io.netty.bootstrap ServerBootstrap)
        (io.netty.buffer ByteBuf ByteBufAllocator ByteBufUtil
                         CompositeByteBuf Unpooled)
        (io.netty.channel ChannelHandlerContext ChannelInitializer
                          ChannelOption SimpleChannelInboundHandler)
        (io.netty.channel.nio NioEventLoopGroup)
        (io.netty.channel.socket.nio NioServerSocketChannel)
        (io.netty.handler.codec MessageToMessageDecoder
                                MessageToMessageEncoder)
        (io.netty.handler.timeout ReadTimeoutHandler))

;;;;;;;;;;;;;;
;;; COMMON ;;;
;;;;;;;;;;;;;;








>
>
|
















|







19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
(ns streamful.server
  (:require [clojure.java.io :as io]
            [clojure.string :as str]
            [streamful.crypto :as cr]
            [streamful.io :as sio]
            [streamful.temp :as temp]
            [taoensso.timbre :as log])
  (:import (io.netty.handler.codec ReplayingDecoder)
           (io.netty.util Signal)
           (java.io ByteArrayInputStream ByteArrayOutputStream File FileInputStream FileOutputStream
                    InputStream OutputStream RandomAccessFile)
           (java.net ServerSocket Socket SocketTimeoutException)
           (java.nio ByteBuffer ByteOrder CharBuffer)
           (java.nio.channels FileChannel FileChannel$MapMode)
           (java.util Scanner)
           (java.util.concurrent Executors ThreadFactory)))

(log/set-ns-min-level! #"io.netty.*" :info)

(import (io.netty.bootstrap ServerBootstrap)
        (io.netty.buffer ByteBuf ByteBufAllocator ByteBufUtil
                         CompositeByteBuf Unpooled)
        (io.netty.channel ChannelHandlerContext ChannelInitializer
                          ChannelOption SimpleChannelInboundHandler)
        (io.netty.channel.nio NioEventLoopGroup)
        (io.netty.channel.socket.nio NioServerSocketChannel)
        (io.netty.handler.codec ByteToMessageDecoder
                                MessageToMessageEncoder)
        (io.netty.handler.timeout ReadTimeoutHandler))

;;;;;;;;;;;;;;
;;; COMMON ;;;
;;;;;;;;;;;;;;

255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281


282
283
284
285
286
287




288
289

290
291
292
293
294
295
296
297
298
299
300

301
302
303
304
305
306
307
308
309
310
311
312


313

314

315

316
317
318
319
320
321
322
    (fn []
      (log/info "Stopping server")
      (reset! running false))))

;;;;;;;;;;;;;;;;;;;;
;;; NETTY SERVER ;;;
;;;;;;;;;;;;;;;;;;;;

(defn- read-line-from-buf! [^ByteBuf msg]
  (let [i (atom 0)
        _ (loop []
            (let [c (.getByte msg @i)]
              (when (not= 10 c)
                (swap! i inc)
                (recur))))
        buf (.retainedSlice msg 0 @i)]
    (try
      (.toString buf cr/encoding)
      (finally (.release buf)))))

(defn- add-to-composite! [^CompositeByteBuf composite, ^ByteBuf buf]
  (.addComponent composite buf)
  (let [prev-idx (.writerIndex composite)
        addition (.readableBytes buf)]
    (.writerIndex composite (+ prev-idx addition))))

(defn- buf->temp-file-stream [bytes-to-read, ^ByteBuf msg]


  (let [^File f (:temp-file (temp/temp-file! ".bin"))
        sliced (.retainedSlice msg 0 bytes-to-read)
        ^ByteBuffer nio-buffer (.nioBuffer sliced)]
    (try
      (with-open [fo (RandomAccessFile. f "rw")
                  ^FileChannel fc (.getChannel fo)]




        (.write fc nio-buffer 0))
      (finally (.release sliced)))

    (FileInputStream. f)))

(defn- buf->byte-stream [bytes-to-read msg]
  (let [arr (byte-array bytes-to-read)]
    (.getBytes msg 0 arr)
    (ByteArrayInputStream. arr)))

(defn- decoder [{:keys [session-id
                        current-mode
                        bytes-to-read
                        file-buffer-bytes]}]

  (proxy [MessageToMessageDecoder] []
    (decode [^ChannelHandlerContext ctx, ^ByteBuf msg, out]
      (try
        (log/trace session-id "decode" @current-mode @bytes-to-read)
        (let [read-result
              (case @current-mode
                :cmd
                (read-line-from-buf! msg)

                :bytes
                (if (> @bytes-to-read file-buffer-bytes)
                  (buf->temp-file-stream @bytes-to-read msg)


                  (buf->byte-stream @bytes-to-read msg)))]

          (log/trace session-id "decoded" (loggable-msg read-result))

          (.add out read-result))

        (catch Throwable t
          (log/error t)
          (.close ctx))))))

(defn- stream->temp->buf! [^long num-bytes, ^InputStream in]
  (let [^File temp-file (:temp-file (temp/temp-file! ".bin"))]
    (with-open [file-out (FileOutputStream. temp-file)]







<

|
|
|
|
|
|
<
<
<
<







|
>
>

|
<
<
|
|
>
>
>
>
|
<
>


|
|
|





|
>
|










|
>
>

>
|
>
|
>







257
258
259
260
261
262
263

264
265
266
267
268
269
270




271
272
273
274
275
276
277
278
279
280
281
282


283
284
285
286
287
288
289

290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
    (fn []
      (log/info "Stopping server")
      (reset! running false))))

;;;;;;;;;;;;;;;;;;;;
;;; NETTY SERVER ;;;
;;;;;;;;;;;;;;;;;;;;

(defn- read-line-from-buf! [^ByteBuf msg]
  (let [out (ByteArrayOutputStream.)]
    (loop []
      (let [b (.readByte msg)]
        (if (= 10 b)
          (.toString out cr/encoding)
          (do (.write out b) (recur)))))))





(defn- add-to-composite! [^CompositeByteBuf composite, ^ByteBuf buf]
  (.addComponent composite buf)
  (let [prev-idx (.writerIndex composite)
        addition (.readableBytes buf)]
    (.writerIndex composite (+ prev-idx addition))))

(defn- buf->temp-file-stream [^long file-channel-chunk-size
                              ^long bytes-to-read,
                              ^ByteBuf msg]
  (let [^File f (:temp-file (temp/temp-file! ".bin"))
        ^bytes buf (byte-array file-channel-chunk-size)]


    (with-open [fo (RandomAccessFile. f "rw")
                ^FileChannel fc (.getChannel fo)]
      (loop [bytes-left bytes-to-read]
        (when (pos? bytes-left)
          (let [this-chunk-size (min bytes-left file-channel-chunk-size)]
            (.readBytes msg buf 0 this-chunk-size)
            (.write fc (ByteBuffer/wrap buf 0 this-chunk-size))

            (recur (- bytes-left this-chunk-size))))))
    (FileInputStream. f)))

(defn- buf->byte-stream [^long bytes-to-read, ^ByteBuf msg]
  (let [^bytes arr (byte-array bytes-to-read)]
    (.readBytes msg arr 0 bytes-to-read)
    (ByteArrayInputStream. arr)))

(defn- decoder [{:keys [session-id
                        current-mode
                        bytes-to-read
                        file-buffer-bytes
                        file-channel-chunk-size]}]
  (proxy [ReplayingDecoder] []
    (decode [^ChannelHandlerContext ctx, ^ByteBuf msg, out]
      (try
        (log/trace session-id "decode" @current-mode @bytes-to-read)
        (let [read-result
              (case @current-mode
                :cmd
                (read-line-from-buf! msg)

                :bytes
                (if (> @bytes-to-read file-buffer-bytes)
                  (buf->temp-file-stream file-channel-chunk-size
                                         @bytes-to-read
                                         msg)
                  (buf->byte-stream @bytes-to-read msg)))]
          (when read-result
            (log/trace session-id "decoded" (loggable-msg read-result))
            (proxy-super checkpoint)
            (.add out read-result)))
        (catch Signal s (throw s))
        (catch Throwable t
          (log/error t)
          (.close ctx))))))

(defn- stream->temp->buf! [^long num-bytes, ^InputStream in]
  (let [^File temp-file (:temp-file (temp/temp-file! ".bin"))]
    (with-open [file-out (FileOutputStream. temp-file)]
410
411
412
413
414
415
416
417

418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434

435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451




452
453
454
455
456
457

458
459
460

461
462

463
464
465
466
467

468
469
470
471
472
473
474
475
476
477
478
479
480
481
                     @msg-count
                     new-mode
                     response)
          (when (instance? InputStream msg) (.close msg))
          (if out (.writeAndFlush ctx out) (.close ctx))))
      (exceptionCaught [_ctx t] (log/error t)))))

(defn- channel-initializer [handler timeout file-buffer-bytes]

  (proxy [ChannelInitializer] []
    (initChannel [socketChannel]
      (let [session-id (cr/guid)
            remote-ip (.remoteAddress socketChannel)
            current-mode (ref :cmd)
            bytes-to-read (ref nil)
            msg-count (ref 0)

            props
            {:session-id session-id
             :remote-ip remote-ip
             :current-mode current-mode
             :bytes-to-read bytes-to-read
             :handler handler
             :timeout timeout
             :msg-count msg-count
             :file-buffer-bytes file-buffer-bytes}]

        (log/trace session-id "initChannel")

        (-> socketChannel
            .pipeline
            (.addLast
              "readTimeoutHandler"
              (ReadTimeoutHandler. (int timeout)))
            (.addLast "encoder" (encoder props))
            (.addLast "decoder" (decoder props))
            (.addLast "mainHandler" (channel-handler props)))))))

(defn start-netty-server!
  "The `handler` that you pass must follow the contract specified in
   `handle-client-msg!`

   `timeout` is the time to wait for bytes from a client
   `accept-timeout` is the time to wait for a socket connection





  See the `streamful.server.server-test` for more."
  [^Integer port, handler, on-start, on-exit, &
   {:keys [^Integer timeout
           ^Integer accept-timeout,
           ^Long file-buffer-bytes]

    :or {timeout 60000
         accept-timeout 5000
         file-buffer-bytes 1024}}]

  (let [server-group (NioEventLoopGroup.)
        client-group (NioEventLoopGroup.)

        server (-> (ServerBootstrap.)
                   (.group server-group client-group)
                   (.channel NioServerSocketChannel)
                   (.childHandler
                     (channel-initializer handler timeout file-buffer-bytes))

                   (.childOption ChannelOption/SO_KEEPALIVE true)
                   (.option ChannelOption/CONNECT_TIMEOUT_MILLIS
                            (int accept-timeout)))
        f (-> server (.bind port) .sync)]

    (when (.isSuccess f) (on-start))

    (fn []
      (log/info "Stopping server")
      (try
        (.shutdownGracefully server-group)
        (.shutdownGracefully client-group)
        (-> f .channel .closeFuture .sync)
        (finally (on-exit))))))







|
>
















|
>

















>
>
>
>





|
>


|
>


>
|
|
|
|
|
>
|
|
|











417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
                     @msg-count
                     new-mode
                     response)
          (when (instance? InputStream msg) (.close msg))
          (if out (.writeAndFlush ctx out) (.close ctx))))
      (exceptionCaught [_ctx t] (log/error t)))))

(defn- channel-initializer
  [handler timeout file-buffer-bytes file-channel-chunk-size]
  (proxy [ChannelInitializer] []
    (initChannel [socketChannel]
      (let [session-id (cr/guid)
            remote-ip (.remoteAddress socketChannel)
            current-mode (ref :cmd)
            bytes-to-read (ref nil)
            msg-count (ref 0)

            props
            {:session-id session-id
             :remote-ip remote-ip
             :current-mode current-mode
             :bytes-to-read bytes-to-read
             :handler handler
             :timeout timeout
             :msg-count msg-count
             :file-buffer-bytes file-buffer-bytes
             :file-channel-chunk-size file-channel-chunk-size}]
        (log/trace session-id "initChannel")

        (-> socketChannel
            .pipeline
            (.addLast
              "readTimeoutHandler"
              (ReadTimeoutHandler. (int timeout)))
            (.addLast "encoder" (encoder props))
            (.addLast "decoder" (decoder props))
            (.addLast "mainHandler" (channel-handler props)))))))

(defn start-netty-server!
  "The `handler` that you pass must follow the contract specified in
   `handle-client-msg!`

   `timeout` is the time to wait for bytes from a client
   `accept-timeout` is the time to wait for a socket connection
   `file-buffer-bytes` is the size an incoming message must be to stream
                       to a temp file
   `file-channel-chunk-size` is the size of a single chunk to read into memory
                             before writing to the aforementioned file buffer

  See the `streamful.server.server-test` for more."
  [^Integer port, handler, on-start, on-exit, &
   {:keys [^Integer timeout
           ^Integer accept-timeout,
           ^Long file-buffer-bytes
           ^Long file-channel-chunk-size]
    :or {timeout 60000
         accept-timeout 5000
         file-buffer-bytes 1024
         file-channel-chunk-size 1024}}]
  (let [server-group (NioEventLoopGroup.)
        client-group (NioEventLoopGroup.)
        server
        (-> (ServerBootstrap.)
            (.group server-group client-group)
            (.channel NioServerSocketChannel)
            (.childHandler
              (channel-initializer
                handler timeout file-buffer-bytes file-channel-chunk-size))
            (.childOption ChannelOption/SO_KEEPALIVE true)
            (.option ChannelOption/CONNECT_TIMEOUT_MILLIS
                     (int accept-timeout)))
        f (-> server (.bind port) .sync)]

    (when (.isSuccess f) (on-start))

    (fn []
      (log/info "Stopping server")
      (try
        (.shutdownGracefully server-group)
        (.shutdownGracefully client-group)
        (-> f .channel .closeFuture .sync)
        (finally (on-exit))))))

Changes to src/streamful/stream.clj.

1
2
3
4
5
6
7
8
9


(ns streamful.stream)

(defprotocol StreamModel
  (register-stream! [_, ^String pk, msg, ^bytes original-msg-bytes])
  (configure-stream! [_, ^String pk, msg, ^bytes original-msg-bytes])
  (get-messages! [_, ^String pk, msg, ^bytes original-msg-bytes]
    "Get messages from a stream. May be part of a signed request.
    Will likely cause state to change in the backend as we keep track of
    who has subscribed to our stream."))









|
|
>
>
1
2
3
4
5
6
7
8
9
10
11
(ns streamful.stream)

(defprotocol StreamModel
  (register-stream! [_, ^String pk, msg, ^bytes original-msg-bytes])
  (configure-stream! [_, ^String pk, msg, ^bytes original-msg-bytes])
  (get-messages! [_, ^String pk, msg, ^bytes original-msg-bytes]
    "Get messages from a stream. May be part of a signed request.
    May cause state to change in the backend as we keep track of
    who has subscribed to our stream.")
  (put-message! [_, ^String pk, msg, ^bytes original-msg-bytes]
    "Put a message into a stream. Must be part of a signed request."))

Changes to src/streamful/stream_datalevin.clj.

8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83



84
85
86
87
88
89
90
91
92
93

















94
95
96
97
98
99
100
101
102
103
104


(def ^:private stream-agg-table "stream-aggregates")
(def ^:private stream-data-table "stream-data")

(defn- get-db-manifest [db] (db/get-value db stream-agg-table :manifest))

(defn- streamk [iid t] (str iid "|" t))

(defn- stream-msg [iid msg pk original-msg-bytes]
  (let [t (System/currentTimeMillis)]
    [:put stream-data-table
     (streamk iid t) {"m" msg, "t" t, "psk" pk, "o" original-msg-bytes}
     :string :data]))

;; todo allow substreams of substreams
;; for now all substreams are implicitly of the root stream
(defn- register-stream! [db pk {{:strs [id]} "m" :as msg} original-msg-bytes]



  (try
    (let [manifest (get-db-manifest db)

          {{:strs [substreams-allowed?]} "c" :as root-stream}
          (and manifest (manifest "root"))

          existing-stream (and manifest (manifest id))
          is-root? (= id "root")
          root-exists? (not (nil? root-stream))]
      (cond
        (or (and substreams-allowed? (not existing-stream))
            (and is-root? (not root-exists?)))
        (let [internal-id (cr/guid)
              smsg (stream-msg internal-id msg pk original-msg-bytes)
              first-msg-time (-> smsg (nth 3) :t)]
          (db/transact-kv db
            [[:put stream-agg-table
              :manifest
              (merge manifest {id
                               {"psk" pk
                                "iid" internal-id
                                "f" first-msg-time}})]
             smsg])
          :ok)

        (not root-exists?) :root-stream-missing

        existing-stream :already-exists

        :else :substreams-not-allowed))
    (catch Throwable t
      (log/error t)
      :database-error)))

(defn- configure-stream! [db
                          pk
                          {{:strs [id params]} "m" :as msg}
                          original-msg-bytes]
  (try
    (let [manifest (get-db-manifest db)
          {:strs [iid cfg] :as existing-stream} (and manifest (manifest id))]
      (cond
        (and existing-stream (creq/eq? pk (existing-stream "psk")))
        (do
          (db/transact-kv db
            [[:put stream-agg-table
              :manifest
              (merge manifest
                     {id (assoc existing-stream "c" (merge cfg params))})]
             (stream-msg iid msg pk original-msg-bytes)])
          :ok)

        (not existing-stream) :stream-missing

        :else :access-denied))
    (catch Throwable t
      (log/error t)
      :database-error)))

(defn get-messages! [db client-psk {{:strs [id]} "m"} _original-msg-bytes]



  (let [manifest (get-db-manifest db)
        {:strs [iid f psk]} (and manifest (manifest id))
        fk (streamk iid f)
        lk (streamk iid (System/currentTimeMillis))
        r [:closed fk lk]]
    (if (creq/eq? psk client-psk)
      {:status :ok
       :messages (db/get-first-n db stream-data-table 50 r :string :data true)}
      {:status :stream-missing})))


















(defn build-stream-model [location]
  (let [db (db/open-kv location)]
    (db/open-dbi db stream-agg-table)
    (db/open-dbi db stream-data-table)
    (reify stream/StreamModel
      (register-stream! [_, pk, msg, original-msg-bytes]
        (register-stream! db pk msg original-msg-bytes))
      (configure-stream! [_, pk, msg, original-msg-bytes]
        (configure-stream! db pk msg original-msg-bytes))
      (get-messages! [_, pk, msg, original-msg-bytes]
        (get-messages! db pk msg original-msg-bytes)))))









|


|




|
>
>
>













|





|















|






|






|




|




|
>
>
>










>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>










|
>
>
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
(def ^:private stream-agg-table "stream-aggregates")
(def ^:private stream-data-table "stream-data")

(defn- get-db-manifest [db] (db/get-value db stream-agg-table :manifest))

(defn- streamk [iid t] (str iid "|" t))

(defn- stream-msg [iid msg client-psk original-msg-bytes]
  (let [t (System/currentTimeMillis)]
    [:put stream-data-table
     (streamk iid t) {"m" msg, "t" t, "psk" client-psk, "o" original-msg-bytes}
     :string :data]))

;; todo allow substreams of substreams
;; for now all substreams are implicitly of the root stream
(defn- register-stream! [db
                         client-psk
                         {{:strs [id]} "m" :as msg}
                         original-msg-bytes]
  (try
    (let [manifest (get-db-manifest db)

          {{:strs [substreams-allowed?]} "c" :as root-stream}
          (and manifest (manifest "root"))

          existing-stream (and manifest (manifest id))
          is-root? (= id "root")
          root-exists? (not (nil? root-stream))]
      (cond
        (or (and substreams-allowed? (not existing-stream))
            (and is-root? (not root-exists?)))
        (let [internal-id (cr/guid)
              smsg (stream-msg internal-id msg client-psk original-msg-bytes)
              first-msg-time (-> smsg (nth 3) :t)]
          (db/transact-kv db
            [[:put stream-agg-table
              :manifest
              (merge manifest {id
                               {"psk" client-psk
                                "iid" internal-id
                                "f" first-msg-time}})]
             smsg])
          :ok)

        (not root-exists?) :root-stream-missing

        existing-stream :already-exists

        :else :substreams-not-allowed))
    (catch Throwable t
      (log/error t)
      :database-error)))

(defn- configure-stream! [db
                          client-psk
                          {{:strs [id params]} "m" :as msg}
                          original-msg-bytes]
  (try
    (let [manifest (get-db-manifest db)
          {:strs [iid cfg] :as existing-stream} (and manifest (manifest id))]
      (cond
        (and existing-stream (creq/eq? client-psk (existing-stream "psk")))
        (do
          (db/transact-kv db
            [[:put stream-agg-table
              :manifest
              (merge manifest
                     {id (assoc existing-stream "c" (merge cfg params))})]
             (stream-msg iid msg client-psk original-msg-bytes)])
          :ok)

        (not existing-stream) :stream-missing

        :else :unauthorized))
    (catch Throwable t
      (log/error t)
      :database-error)))

(defn get-messages! [db
                     client-psk
                     {{:strs [id]} "m" :as msg}
                     _original-msg-bytes]
  (let [manifest (get-db-manifest db)
        {:strs [iid f psk]} (and manifest (manifest id))
        fk (streamk iid f)
        lk (streamk iid (System/currentTimeMillis))
        r [:closed fk lk]]
    (if (creq/eq? psk client-psk)
      {:status :ok
       :messages (db/get-first-n db stream-data-table 50 r :string :data true)}
      {:status :stream-missing})))

(defn put-message! [db
                    client-psk
                    {{:strs [id ]} "m" :as msg}
                    original-msg-bytes]
  (let [manifest (get-db-manifest db)
        {:strs [iid] :as existing-stream} (and manifest (manifest id))]
    (cond
      (and existing-stream (creq/eq? client-psk (existing-stream "psk")))
      (do
        (db/transact-kv db
          [(stream-msg iid msg client-psk original-msg-bytes)])
        :ok)

      (not existing-stream) :stream-missing

      :else :unauthorized)))

(defn build-stream-model [location]
  (let [db (db/open-kv location)]
    (db/open-dbi db stream-agg-table)
    (db/open-dbi db stream-data-table)
    (reify stream/StreamModel
      (register-stream! [_, pk, msg, original-msg-bytes]
        (register-stream! db pk msg original-msg-bytes))
      (configure-stream! [_, pk, msg, original-msg-bytes]
        (configure-stream! db pk msg original-msg-bytes))
      (get-messages! [_, pk, msg, original-msg-bytes]
        (get-messages! db pk msg original-msg-bytes))
      (put-message! [_, pk, msg, original-msg-bytes]
        (put-message! db pk msg original-msg-bytes)))))

Changes to test/streamful/protocol_test.clj.

21
22
23
24
25
26
27
28
29
30
31
32



33
34
35
36
37
38
39
            [clojure.test :refer :all]
            [streamful.client :as client]
            [streamful.crypto :as cr]
            [streamful.protocol :refer :all]
            [streamful.test-asserts :as ta]
            [streamful.test-cfg :as tcfg]))

(defn- reassemble-msg [{:strs [psk o] :as m}]
  (-> o (cr/verified psk) cbor/decode))

(defn- get-as [client keys id]
  (is (= {:result "ok"} (client/as-stream client keys)))



  (client/get-messages client {:id id}))

(defn- get-m [m] (get m "m"))

(tcfg/def-server-test protocol-test [client]
  (testing "ping"
    (let [msg-to-echo (cr/guid94)]







|


|
|
>
>
>







21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
            [clojure.test :refer :all]
            [streamful.client :as client]
            [streamful.crypto :as cr]
            [streamful.protocol :refer :all]
            [streamful.test-asserts :as ta]
            [streamful.test-cfg :as tcfg]))

(defn- reassemble-msg [{:strs [psk o]}]
  (-> o (cr/verified psk) cbor/decode))

(defn- as [client keys]
  (is (= {:result "ok"} (client/as-stream client keys))))

(defn- get-as [client keys id]
  (as client keys)
  (client/get-messages client {:id id}))

(defn- get-m [m] (get m "m"))

(tcfg/def-server-test protocol-test [client]
  (testing "ping"
    (let [msg-to-echo (cr/guid94)]
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91


92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142






143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164




165
166
167






168
169

170
171













172
                 {:public-key public-signing-key
                  :private-key private-signing-key2})))

        (is (= {:response "unauthorized"}
               (client/register-stream client {:id "test1"}))))

      (testing "first stream must be called 'root'"
        (is (= {:result "ok"} (client/as-stream client keys)))

        (is (= {:response "root not yet configured -- register that first"}
               (client/register-stream client {:id "test1"})))

        (is (= {:response "ok"} (client/register-stream client {:id "root"}))))

      (testing "can't register multiple streams with the same name"
        (is (= {:response "already exists"}
               (client/register-stream client {:id "root"}))))

      (testing "can't register more than one stream"
        (is (= {:response "substreams aren't allowed here"}
               (client/register-stream client {:id "test1"}))))

      (testing "only stream owner can configure it"
        (is (= {:result "ok"} (client/as-stream client keys2)))
        (is (= {:response "access denied"}
               (client/configure-stream client {:id "root"
                                                :substreams-allowed? true}))))



      (testing "if root allows substreams"
        (is (= {:result "ok"} (client/as-stream client keys)))
        (is (= {:response "ok"}
               (client/configure-stream client {:id "root"
                                                :substreams-allowed? true})))
        (testing "a substream can be registered"
          (is (= {:result "ok"} (client/as-stream client keys2)))
          (is (= {:response "ok"}
                 (client/register-stream client {:id "test1"})))

          (testing "but not twice"
            (is (= {:result "ok"} (client/as-stream client keys)))
            (is (= {:response "already exists"}
                   (client/register-stream client {:id "test1"}))))

          (testing "and not after reconfigured off"
            (is (= {:response "ok"}
                   (client/configure-stream client
                                            {:id "root"
                                             :substreams-allowed? false})))
            (is (= {:response "substreams aren't allowed here"}
                   (client/register-stream client {:id "test2"}))))

          (testing "but not twice, even after reconfiguring"
            (is (= {:response "ok"}
                   (client/configure-stream client
                                            {:id "root"
                                             :substreams-allowed? true})))
            (is (= {:result "ok"} (client/as-stream client keys)))
            (is (= {:response "already exists"}
                   (client/register-stream client {:id "test1"}))))))

      (testing "retrieving stream messages"
        (let [{{root-messages "messages" :as r1} :response}
              (get-as client keys "root")

              {{test1-messages "messages" :as r2} :response}
              (get-as client keys2 "test1")

              expected-root-messages
              [{"c" "reg"
                "m" {"id" "root"}}
               {"c" "cfg"
                "m" {"id" "root", "params" {"substreams-allowed?" true}}}
               {"c" "cfg"
                "m" {"id" "root", "params" {"substreams-allowed?" false}}}
               {"c" "cfg"
                "m" {"id" "root", "params" {"substreams-allowed?" true}}}]

              expected-test1-messages
              [{"c" "reg", "m" {"id" "test1"}}]]







          (testing "messages include configuration stuff"
            (is (= expected-root-messages
                   (map get-m root-messages)) r1)
            (is (= expected-test1-messages
                   (map get-m test1-messages)) r2))

          (testing "we can validate and reassemble from original binaries"
            (is (= expected-root-messages
                   (map reassemble-msg root-messages)))
            (is (= expected-test1-messages
                   (map reassemble-msg test1-messages)))))))

    (testing "good error message when stream is missing"
      (is (= {:response {"status" "stream not found"}}
             (get-as client keys "missing-stream"))))

    (testing "streams are private by default, show as missing"
      (is (= {:response {"status" "stream not found"}}
             (get-as client keys2 "root"))))

    (ta/pending "publish simple messages"





      ;; todo send some messages to stream
      ;; todo fail to send messages if using wrong keys






      ;; todo retrieve all messages
      ;; todo if stream has messages that aren't signed properly reject them

      )














    (ta/pending "stream access")))







|















|
|



>
>

|




|




|
















|



<
<
<
<
<
<
<
|
|
|
|
|
|
|
|
|

|
|
>
>
>
>
>
>

|
|
|
|
|

|
|
|
|
|

|
|
|

|
|
|

|
>
>
>
>

|
|
>
>
>
>
>
>
|
|
>
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>

67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128







129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
                 {:public-key public-signing-key
                  :private-key private-signing-key2})))

        (is (= {:response "unauthorized"}
               (client/register-stream client {:id "test1"}))))

      (testing "first stream must be called 'root'"
        (as client keys)

        (is (= {:response "root not yet configured -- register that first"}
               (client/register-stream client {:id "test1"})))

        (is (= {:response "ok"} (client/register-stream client {:id "root"}))))

      (testing "can't register multiple streams with the same name"
        (is (= {:response "already exists"}
               (client/register-stream client {:id "root"}))))

      (testing "can't register more than one stream"
        (is (= {:response "substreams aren't allowed here"}
               (client/register-stream client {:id "test1"}))))

      (testing "only stream owner can configure it"
        (as client keys2)
        (is (= {:response "unauthorized"}
               (client/configure-stream client {:id "root"
                                                :substreams-allowed? true}))))

      (ta/pending "unsigned configuration messages don't get to the model")

      (testing "if root allows substreams"
        (as client keys)
        (is (= {:response "ok"}
               (client/configure-stream client {:id "root"
                                                :substreams-allowed? true})))
        (testing "a substream can be registered"
          (as client keys2)
          (is (= {:response "ok"}
                 (client/register-stream client {:id "test1"})))

          (testing "but not twice"
            (as client keys)
            (is (= {:response "already exists"}
                   (client/register-stream client {:id "test1"}))))

          (testing "and not after reconfigured off"
            (is (= {:response "ok"}
                   (client/configure-stream client
                                            {:id "root"
                                             :substreams-allowed? false})))
            (is (= {:response "substreams aren't allowed here"}
                   (client/register-stream client {:id "test2"}))))

          (testing "but not twice, even after reconfiguring"
            (is (= {:response "ok"}
                   (client/configure-stream client
                                            {:id "root"
                                             :substreams-allowed? true})))
            (as client keys)
            (is (= {:response "already exists"}
                   (client/register-stream client {:id "test1"}))))))








      (let [expected-root-messages
            [{"c" "reg"
              "m" {"id" "root"}}
             {"c" "cfg"
              "m" {"id" "root", "params" {"substreams-allowed?" true}}}
             {"c" "cfg"
              "m" {"id" "root", "params" {"substreams-allowed?" false}}}
             {"c" "cfg"
              "m" {"id" "root", "params" {"substreams-allowed?" true}}}]

            expected-test1-messages
            [{"c" "reg", "m" {"id" "test1"}}]]
        (testing "retrieving stream messages"
          (let [{{root-messages "messages" :as r1} :response}
                (get-as client keys "root")

                {{test1-messages "messages" :as r2} :response}
                (get-as client keys2 "test1")]

            (testing "messages include configuration stuff"
              (is (= expected-root-messages
                     (map get-m root-messages)) r1)
              (is (= expected-test1-messages
                     (map get-m test1-messages)) r2))

            (testing "we can validate and reassemble from original binaries"
              (is (= expected-root-messages
                     (map reassemble-msg root-messages)))
              (is (= expected-test1-messages
                     (map reassemble-msg test1-messages))))))

        (testing "good error message when stream is missing"
          (is (= {:response {"status" "stream not found"}}
                 (get-as client keys "missing-stream"))))

        (testing "streams are private by default, show as missing"
          (is (= {:response {"status" "stream not found"}}
                 (get-as client keys2 "root"))))

        (testing "publish simple messages"
          (as client keys)
          (is (= {:response "ok"}
                 (client/submit-message client
                                        {:id "root" :t "hello, world!"})))

          (testing "can't publish to someone else's stream"
            (as client keys2)
            (is (= {:response "unauthorized"}
                   (client/submit-message client
                                          {:id "root" :t "hello, friend!"})))
            (is (= {:response "ok"}
                   (client/submit-message client
                                          {:id "test1" :t "hello, friend!"}))))
          (testing "messages show up"
            (let [{{root-messages "messages"} :response}
                  (get-as client keys "root")

                  {{test1-messages "messages"} :response}
                  (get-as client keys2 "test1")]
              (is (= (conj
                       expected-root-messages
                       {"c" "put"
                        "m" {"id" "root", "params" {"t" "hello, world!"}}})
                     (map get-m root-messages)))
              (is (= (conj
                       expected-test1-messages
                       {"c" "put"
                        "m" {"id" "test1", "params" {"t" "hello, friend!"}}})
                     (map get-m test1-messages)))))
          (ta/pending "unsigned put messages don't get to the model")
          (ta/pending "messages have server receipt timestamps"))))
    (ta/pending "stream access")))

Changes to test/streamful/server_test.clj.

21
22
23
24
25
26
27

28
29
30
31
32
33
34
            [defenv.core :refer [defenv] :as env]
            [streamful.client :as client]
            [streamful.crypto :as cr]
            [streamful.env :as senv]
            [streamful.io :as sio]
            [streamful.log-util :as lu]
            [streamful.server :refer :all]

            [streamful.test-net :as tnet]
            [streamful.test-protocol :as tp]
            [streamful.test-thread :as tthread]
            [taoensso.timbre :as log])
  (:import (java.io ByteArrayInputStream InputStream OutputStream)
           (java.net Socket)))








>







21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
            [defenv.core :refer [defenv] :as env]
            [streamful.client :as client]
            [streamful.crypto :as cr]
            [streamful.env :as senv]
            [streamful.io :as sio]
            [streamful.log-util :as lu]
            [streamful.server :refer :all]
            [streamful.test-asserts :as ta]
            [streamful.test-net :as tnet]
            [streamful.test-protocol :as tp]
            [streamful.test-thread :as tthread]
            [taoensso.timbre :as log])
  (:import (java.io ByteArrayInputStream InputStream OutputStream)
           (java.net Socket)))

220
221
222
223
224
225
226
227

                   test-name
                   (- (System/currentTimeMillis) start-time))))))

(deftest server-test
  (log/set-ns-min-level! #"io.netty" :info)
  (lu/test-log-level! (log/info (with-out-str (env/display-env))))
  (run-server-test "socket server" start-socket-server!)
  (run-server-test "netty server" start-netty-server!))








|
>
221
222
223
224
225
226
227
228
229
                   test-name
                   (- (System/currentTimeMillis) start-time))))))

(deftest server-test
  (log/set-ns-min-level! #"io.netty" :info)
  (lu/test-log-level! (log/info (with-out-str (env/display-env))))
  (run-server-test "socket server" start-socket-server!)
  (run-server-test "netty server" start-netty-server!)
  (ta/pending "netty uses file buffers properly"))

Changes to test/streamful/test_cfg.clj.

1
2
3
4
5
6
7
8
9

10
11
12
13
14
15
16
17
18
19
20
21
22
23

(ns streamful.test-cfg
  (:require [clojure.test :refer :all]
            [streamful.core :as core]
            [streamful.log-util :as lu]
            [streamful.temp :as temp]
            [streamful.test-protocol :as tp])
  (:import (java.io File)))

(defmacro def-logcfg-test [n & body]

  `(deftest ~n (lu/test-log-level! ~@body)))

(defn- build-temp-protocol-handler []
  (let [^File temp-dir (temp/session-tempdir!)
        ^File db-dir (File. temp-dir "db")]
    (core/build-protocol-handler (.getAbsolutePath db-dir) 1)))

(defmacro def-server-test
  [n b & body]
  `(def-logcfg-test ~n
     (do
       (tp/with-server-and-client (~build-temp-protocol-handler) [~(first b)]
         ~@body)
       (temp/cleanup-all-tempdirs!))))










>
|













>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
(ns streamful.test-cfg
  (:require [clojure.test :refer :all]
            [streamful.core :as core]
            [streamful.log-util :as lu]
            [streamful.temp :as temp]
            [streamful.test-protocol :as tp])
  (:import (java.io File)))

(defmacro def-logcfg-test [n & body]
  `(deftest ~n
     (lu/test-log-level! ~@body)))

(defn- build-temp-protocol-handler []
  (let [^File temp-dir (temp/session-tempdir!)
        ^File db-dir (File. temp-dir "db")]
    (core/build-protocol-handler (.getAbsolutePath db-dir) 1)))

(defmacro def-server-test
  [n b & body]
  `(def-logcfg-test ~n
     (do
       (tp/with-server-and-client (~build-temp-protocol-handler) [~(first b)]
         ~@body)
       (temp/cleanup-all-tempdirs!))))

Changes to test/streamful/test_protocol.clj.

1
2
3
4
5
6
7
8
9


10
11
12
13
14
15
16
(ns streamful.test-protocol
  (:require [clojure.test :refer :all]
            [streamful.client :as client]
            [streamful.server :as server]
            [streamful.test-net :as tnet]
            [streamful.test-thread :as tthread]))

(defn start-server-and-wait [start-fn! port handler &
                             {:keys [on-start on-exit num-threads]


                              :or {on-exit (constantly :default)
                                   on-start (constantly :default)
                                   num-threads 1}}]
  (let [server-running (atom false)
        on-start
        (fn []
          (on-start)








|
>
>







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
(ns streamful.test-protocol
  (:require [clojure.test :refer :all]
            [streamful.client :as client]
            [streamful.server :as server]
            [streamful.test-net :as tnet]
            [streamful.test-thread :as tthread]))

(defn start-server-and-wait [start-fn! port handler &
                             {:keys [on-start
                                     on-exit
                                     num-threads]
                              :or {on-exit (constantly :default)
                                   on-start (constantly :default)
                                   num-threads 1}}]
  (let [server-running (atom false)
        on-start
        (fn []
          (on-start)