Streamful

Check-in [6bdd6c82f3]
Login

Check-in [6bdd6c82f3]

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:List streams
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: 6bdd6c82f3cea76f02e9789ac7c45b921112d5827ce49a2f7dcd949c3a0c3561
User & Date: scstarkey 2025-02-21 13:38:12
Context
2025-02-22
15:07
More fine grained test coverage around commands check-in: c2e62da61c user: scstarkey tags: trunk
2025-02-21
13:38
List streams check-in: 6bdd6c82f3 user: scstarkey tags: trunk
12:58
A few more assertions check-in: 8bb7b6b78f user: scstarkey tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/streamful/client.clj.

96
97
98
99
100
101
102

103
104
105

106
107
108
109
110
111
112
     The client-keys must be a map containing :public-key and :private-key
     used for signing.")
  (claim-server [_, ^String claim-key]
    "Claims a server. Thereafter, whoever uses the same keys
    (you need to use `as` before calling this function)
    may register the `root` stream.")
  (get-messages [_ params] "Fetch messages from a stream")

  (register-stream [_ params] "Register a new stream")
  (configure-stream [_ params] "Configure an existing stream")
  (submit-message [_ params] "Submit a message to a stream")

  (close [_] "Close connection to the server")
  (is-open? [_] "Is the connection currently open?"))

(defn- configure-connection
  [^String host, ^Integer port, ^Integer timeout]
  (let [sock (ref nil)
        out-stream (ref nil)







>


|
>







96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
     The client-keys must be a map containing :public-key and :private-key
     used for signing.")
  (claim-server [_, ^String claim-key]
    "Claims a server. Thereafter, whoever uses the same keys
    (you need to use `as` before calling this function)
    may register the `root` stream.")
  (get-messages [_ params] "Fetch messages from a stream")
  (submit-message [_ params] "Submit a message to a stream")
  (register-stream [_ params] "Register a new stream")
  (configure-stream [_ params] "Configure an existing stream")
  (list-streams [_ params] "See what streams are available for you to
    subscribe to on this server")
  (close [_] "Close connection to the server")
  (is-open? [_] "Is the connection currently open?"))

(defn- configure-connection
  [^String host, ^Integer port, ^Integer timeout]
  (let [sock (ref nil)
        out-stream (ref nil)
233
234
235
236
237
238
239






240
241
242
243
244
245
246

    (reify Client
      (ping [_ msg] (send-cmd! nil {:c "ping", :m msg}))

      (get-messages [_ {:keys [sid]}]
        (send-signed! {:c "get" :m {:sid sid}}))







      (claim-server [_ claim-key]
        (send-signed! {:c "claim" :m {:claim-key claim-key}}))

      (as [_ {:keys [public-key private-key public-signing-key] :as ks}]
        (cond ks
              (let [response (send-keys! public-key public-signing-key)
                    server-pk-str (process-key-response server-pk response)]







>
>
>
>
>
>







235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254

    (reify Client
      (ping [_ msg] (send-cmd! nil {:c "ping", :m msg}))

      (get-messages [_ {:keys [sid]}]
        (send-signed! {:c "get" :m {:sid sid}}))

      (submit-message [_ {:keys [sid] :as params}]
        (send-signed!
          {:c "put"
           :m {:sid sid,
               :params (dissoc params :sid)}}))

      (claim-server [_ claim-key]
        (send-signed! {:c "claim" :m {:claim-key claim-key}}))

      (as [_ {:keys [public-key private-key public-signing-key] :as ks}]
        (cond ks
              (let [response (send-keys! public-key public-signing-key)
                    server-pk-str (process-key-response server-pk response)]
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
        (send-signed! {:c "reg", :m {:sid sid}}))

      (configure-stream [_ {:keys [sid] :as params}]
        (send-signed!
          {:c "cfg"
           :m {:sid sid, :params (dissoc params :sid)}}))

      (submit-message [_ {:keys [sid] :as params}]
        (send-signed!
          {:c "put"
           :m {:sid sid,
               :params (dissoc params :sid)}}))

      (close [_] {:result (close!)})

      (is-open? [_] {:result (is-open?)}))))







|
|
<
<
<




278
279
280
281
282
283
284
285
286



287
288
289
290
        (send-signed! {:c "reg", :m {:sid sid}}))

      (configure-stream [_ {:keys [sid] :as params}]
        (send-signed!
          {:c "cfg"
           :m {:sid sid, :params (dissoc params :sid)}}))

      (list-streams [_ params]
        (send-signed! {:c "list" :m params}))




      (close [_] {:result (close!)})

      (is-open? [_] {:result (is-open?)}))))

Changes to src/streamful/core.clj.

93
94
95
96
97
98
99


100
101
102
103
104
105
106

(defn wrap-stream-model [db-stream-model]
  (reify stream/StreamModel
    (has-owner? [_] (stream/has-owner? db-stream-model))
    (claim! [_ pk] (stream/claim! db-stream-model pk))
    (register-stream! [_, pk, msg, original-msg-bytes]
      (stream/register-stream! db-stream-model pk msg original-msg-bytes))


    (configure-stream! [_, pk, msg, original-msg-bytes]
      (stream/configure-stream! db-stream-model pk msg original-msg-bytes))
    (get-messages! [_, pk, {{:strs [sid]} "m" :as msg}, original-msg-bytes]
      (if (= "COPYRIGHT" sid)
        {:status :ok
         :messages [{"m" copyright-text}]}
        (stream/get-messages! db-stream-model pk msg original-msg-bytes)))







>
>







93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108

(defn wrap-stream-model [db-stream-model]
  (reify stream/StreamModel
    (has-owner? [_] (stream/has-owner? db-stream-model))
    (claim! [_ pk] (stream/claim! db-stream-model pk))
    (register-stream! [_, pk, msg, original-msg-bytes]
      (stream/register-stream! db-stream-model pk msg original-msg-bytes))
    (list-streams [_, pk, msg]
      (stream/list-streams db-stream-model pk msg))
    (configure-stream! [_, pk, msg, original-msg-bytes]
      (stream/configure-stream! db-stream-model pk msg original-msg-bytes))
    (get-messages! [_, pk, {{:strs [sid]} "m" :as msg}, original-msg-bytes]
      (if (= "COPYRIGHT" sid)
        {:status :ok
         :messages [{"m" copyright-text}]}
        (stream/get-messages! db-stream-model pk msg original-msg-bytes)))

Changes to src/streamful/protocol.clj.

113
114
115
116
117
118
119

120
121
122
123
124
125
126
(defn- router [stream-model claim-key]
  (fn [{:keys [psk original] {:keys [remote-ip session-id]} :req}
       {:strs [c m] :as cmd}]
    (case c
      "ping" m
      "get" (get-messages psk stream-model cmd original)
      "put" (put-message! psk stream-model cmd original)

      "claim" (claim-server! psk stream-model claim-key cmd)
      "reg" (register-stream! psk stream-model cmd original)
      "cfg" (configure-stream! psk stream-model cmd original)
      (throw (ex-info "Unknown command"
                      {:cmd cmd
                       :session-id session-id
                       :remote-ip remote-ip})))))







>







113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
(defn- router [stream-model claim-key]
  (fn [{:keys [psk original] {:keys [remote-ip session-id]} :req}
       {:strs [c m] :as cmd}]
    (case c
      "ping" m
      "get" (get-messages psk stream-model cmd original)
      "put" (put-message! psk stream-model cmd original)
      "list" (stream/list-streams stream-model psk cmd)
      "claim" (claim-server! psk stream-model claim-key cmd)
      "reg" (register-stream! psk stream-model cmd original)
      "cfg" (configure-stream! psk stream-model cmd original)
      (throw (ex-info "Unknown command"
                      {:cmd cmd
                       :session-id session-id
                       :remote-ip remote-ip})))))

Changes to src/streamful/stream.clj.

19
20
21
22
23
24
25

26
27
28
29
30
31
(ns streamful.stream)

(defprotocol StreamModel
  (has-owner? [_])
  (claim! [_, ^bytes pk])
  (register-stream! [_, ^bytes pk, msg, ^bytes original-msg-bytes])
  (configure-stream! [_, ^bytes pk, msg, ^bytes original-msg-bytes])

  (get-messages! [_, ^bytes pk, msg, ^bytes original-msg-bytes]
    "Get messages from a stream. May be part of a signed request.
    May cause state to change in the backend as we keep track of
    who has subscribed to our stream.")
  (put-message! [_, ^bytes pk, msg, ^bytes original-msg-bytes]
    "Put a message into a stream. Must be part of a signed request."))







>






19
20
21
22
23
24
25
26
27
28
29
30
31
32
(ns streamful.stream)

(defprotocol StreamModel
  (has-owner? [_])
  (claim! [_, ^bytes pk])
  (register-stream! [_, ^bytes pk, msg, ^bytes original-msg-bytes])
  (configure-stream! [_, ^bytes pk, msg, ^bytes original-msg-bytes])
  (list-streams [_, ^bytes pk, msg])
  (get-messages! [_, ^bytes pk, msg, ^bytes original-msg-bytes]
    "Get messages from a stream. May be part of a signed request.
    May cause state to change in the backend as we keep track of
    who has subscribed to our stream.")
  (put-message! [_, ^bytes pk, msg, ^bytes original-msg-bytes]
    "Put a message into a stream. Must be part of a signed request."))

Changes to src/streamful/stream_datalevin.clj.

84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
          (let [internal-id (cr/guid)

                smsg
                (stream-put-msg internal-id msg client-psk original-msg-bytes)

                first-msg-time (-> smsg (nth 3) :t)]
            (db/transact-kv tx
                              [[:put stream-agg-table
                                :manifest
                                (merge manifest {sid
                                                 {"psk" client-psk
                                                  "iid" internal-id
                                                  "f" first-msg-time}})]
                               smsg])
            :ok)

          (not root-exists?) :root-stream-missing

          existing-stream :already-exists

          :else :substreams-not-allowed)))







|
|
|
|
|
|
|







84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
          (let [internal-id (cr/guid)

                smsg
                (stream-put-msg internal-id msg client-psk original-msg-bytes)

                first-msg-time (-> smsg (nth 3) :t)]
            (db/transact-kv tx
              [[:put stream-agg-table
                :manifest
                (merge manifest {sid
                                 {"psk" client-psk
                                  "iid" internal-id
                                  "f" first-msg-time}})]
               smsg])
            :ok)

          (not root-exists?) :root-stream-missing

          existing-stream :already-exists

          :else :substreams-not-allowed)))
128
129
130
131
132
133
134








135
136
137
138
139
140
141

          (not existing-stream) :stream-missing

          :else :unauthorized)))
    (catch Throwable t
      (log/error t)
      :database-error)))









(defn get-messages! [db
                     client-psk
                     {{:strs [sid]} "m"}
                     _original-msg-bytes]
  (let [manifest (get-db-manifest db)
        {:strs [iid f psk]







>
>
>
>
>
>
>
>







128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149

          (not existing-stream) :stream-missing

          :else :unauthorized)))
    (catch Throwable t
      (log/error t)
      :database-error)))

(defn- list-streams [db client-psk _msg]
  (let [manifest (get-db-manifest db)]
    (->> manifest
         (filter
           (fn [[_ {:strs [psk] {:strs [anon-allowed?]} "c"}]]
             (or anon-allowed? (creq/eq? client-psk psk))))
         (map (fn [[k]] k)))))

(defn get-messages! [db
                     client-psk
                     {{:strs [sid]} "m"}
                     _original-msg-bytes]
  (let [manifest (get-db-manifest db)
        {:strs [iid f psk]
181
182
183
184
185
186
187

188
189
190
191
    (reify stream/StreamModel
      (has-owner? [_] (get-db-owner db))
      (claim! [_ pk] (claim-server! db pk))
      (register-stream! [_, pk, msg, original-msg-bytes]
        (register-stream! db pk msg original-msg-bytes))
      (configure-stream! [_, pk, msg, original-msg-bytes]
        (configure-stream! db pk msg original-msg-bytes))

      (get-messages! [_, pk, msg, original-msg-bytes]
        (get-messages! db pk msg original-msg-bytes))
      (put-message! [_, pk, msg, original-msg-bytes]
        (put-message! db pk msg original-msg-bytes)))))







>




189
190
191
192
193
194
195
196
197
198
199
200
    (reify stream/StreamModel
      (has-owner? [_] (get-db-owner db))
      (claim! [_ pk] (claim-server! db pk))
      (register-stream! [_, pk, msg, original-msg-bytes]
        (register-stream! db pk msg original-msg-bytes))
      (configure-stream! [_, pk, msg, original-msg-bytes]
        (configure-stream! db pk msg original-msg-bytes))
      (list-streams [_, pk, msg] (list-streams db pk msg))
      (get-messages! [_, pk, msg, original-msg-bytes]
        (get-messages! db pk msg original-msg-bytes))
      (put-message! [_, pk, msg, original-msg-bytes]
        (put-message! db pk msg original-msg-bytes)))))

Changes to test/streamful/manual_testing.clj.

25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
            [taoensso.timbre :as log])
  (:import (java.util.concurrent Executors)))

;; this appears at server startup
;; eventually we will use a TXT record hosted using DNSSEC
;; to prove servers can be trusted (no SSL authorities needed!)

#_(def ^:private server-pk "9G4BzxYJSwGxC6a3FC3wqqa6Chk/cAV9lmgmyvyhgUA=")

(def ^:private server-pk "J7wkkTsgL/javpyzaq+PdNhn7umlcGT4Xvds9jq16Xs=")

(defn- test-a-bunch []
  (with-open [contained-client
              (client/new-client "localhost" 1100 server-pk 1000)]
    (let [answers
          (->>
            (range 25)







|

|







25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
            [taoensso.timbre :as log])
  (:import (java.util.concurrent Executors)))

;; this appears at server startup
;; eventually we will use a TXT record hosted using DNSSEC
;; to prove servers can be trusted (no SSL authorities needed!)

(def ^:private server-pk "9G4BzxYJSwGxC6a3FC3wqqa6Chk/cAV9lmgmyvyhgUA=")

#_(def ^:private server-pk "J7wkkTsgL/javpyzaq+PdNhn7umlcGT4Xvds9jq16Xs=")

(defn- test-a-bunch []
  (with-open [contained-client
              (client/new-client "localhost" 1100 server-pk 1000)]
    (let [answers
          (->>
            (range 25)
102
103
104
105
106
107
108


109
110
111
112
113
114
115
  (client/claim-server c "v0CA_kT8RQ_IrbEgF9SltA")
  (client/register-stream c {:sid "root"})
  (client/configure-stream c {:sid "root" :substreams-allowed? true})
  (client/configure-stream c {:sid "root" :anon-allowed? true})
  (client/register-stream c {:sid "personal"})
  (client/register-stream c {:sid "public"})
  (client/configure-stream c {:sid "public" :anon-allowed? true})



  (client/get-messages c {:sid "root"})
  (client/get-messages c2 {:sid "root"})
  (client/get-messages c {:sid "personal"})
  (client/get-messages c2 {:sid "personal"})
  (client/get-messages c {:sid "public"})
  (client/get-messages c2 {:sid "public"})







>
>







102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
  (client/claim-server c "v0CA_kT8RQ_IrbEgF9SltA")
  (client/register-stream c {:sid "root"})
  (client/configure-stream c {:sid "root" :substreams-allowed? true})
  (client/configure-stream c {:sid "root" :anon-allowed? true})
  (client/register-stream c {:sid "personal"})
  (client/register-stream c {:sid "public"})
  (client/configure-stream c {:sid "public" :anon-allowed? true})

  (client/list-streams c nil)

  (client/get-messages c {:sid "root"})
  (client/get-messages c2 {:sid "root"})
  (client/get-messages c {:sid "personal"})
  (client/get-messages c2 {:sid "personal"})
  (client/get-messages c {:sid "public"})
  (client/get-messages c2 {:sid "public"})

Changes to test/streamful/protocol_test.clj.

293
294
295
296
297
298
299
300













301
302
303
304
305
306
307
                  (count x)
                  (pos? x)))
        (is (as-> (get-as client keys1 "test1") x
                  (:response x)
                  (x "status")
                  (= "stream not found" x))))
      (ta/pending "approval required")
      (ta/pending "invitation only"))))














(defn- reassemble-from-handler [{:keys [psk original]}]
  (decode-signed-original psk original))

(deftest handle-message-test
  (let [messages-received (atom [])








|
>
>
>
>
>
>
>
>
>
>
>
>
>







293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
                  (count x)
                  (pos? x)))
        (is (as-> (get-as client keys1 "test1") x
                  (:response x)
                  (x "status")
                  (= "stream not found" x))))
      (ta/pending "approval required")
      (ta/pending "invitation only")

      (testing "listing streams"
        (testing "i see my stuff, too"
          (as client keys1)
          (is (= {:response ["root" "test-post-anon"]}
                 (client/list-streams client {})))
          (as client keys2)
          (is (= {:response ["root" "test1"]}
                 (client/list-streams client {}))))
        (testing "public stuff only for anonymous folks"
          (anon client)
          (is (=  {:response ["root"]}
                  (client/list-streams client {}))))))))

(defn- reassemble-from-handler [{:keys [psk original]}]
  (decode-signed-original psk original))

(deftest handle-message-test
  (let [messages-received (atom [])