Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | List streams |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA3-256: |
6bdd6c82f3cea76f02e9789ac7c45b92 |
User & Date: | scstarkey 2025-02-21 13:38:12 |
Context
2025-02-22
| ||
15:07 | More fine grained test coverage around commands check-in: c2e62da61c user: scstarkey tags: trunk | |
2025-02-21
| ||
13:38 | List streams check-in: 6bdd6c82f3 user: scstarkey tags: trunk | |
12:58 | A few more assertions check-in: 8bb7b6b78f user: scstarkey tags: trunk | |
Changes
Changes to src/streamful/client.clj.
︙ | ︙ | |||
96 97 98 99 100 101 102 103 104 | The client-keys must be a map containing :public-key and :private-key used for signing.") (claim-server [_, ^String claim-key] "Claims a server. Thereafter, whoever uses the same keys (you need to use `as` before calling this function) may register the `root` stream.") (get-messages [_ params] "Fetch messages from a stream") (register-stream [_ params] "Register a new stream") (configure-stream [_ params] "Configure an existing stream") | > | > | 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | The client-keys must be a map containing :public-key and :private-key used for signing.") (claim-server [_, ^String claim-key] "Claims a server. Thereafter, whoever uses the same keys (you need to use `as` before calling this function) may register the `root` stream.") (get-messages [_ params] "Fetch messages from a stream") (submit-message [_ params] "Submit a message to a stream") (register-stream [_ params] "Register a new stream") (configure-stream [_ params] "Configure an existing stream") (list-streams [_ params] "See what streams are available for you to subscribe to on this server") (close [_] "Close connection to the server") (is-open? [_] "Is the connection currently open?")) (defn- configure-connection [^String host, ^Integer port, ^Integer timeout] (let [sock (ref nil) out-stream (ref nil) |
︙ | ︙ | |||
233 234 235 236 237 238 239 240 241 242 243 244 245 246 | (reify Client (ping [_ msg] (send-cmd! nil {:c "ping", :m msg})) (get-messages [_ {:keys [sid]}] (send-signed! {:c "get" :m {:sid sid}})) (claim-server [_ claim-key] (send-signed! {:c "claim" :m {:claim-key claim-key}})) (as [_ {:keys [public-key private-key public-signing-key] :as ks}] (cond ks (let [response (send-keys! public-key public-signing-key) server-pk-str (process-key-response server-pk response)] | > > > > > > | 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 | (reify Client (ping [_ msg] (send-cmd! nil {:c "ping", :m msg})) (get-messages [_ {:keys [sid]}] (send-signed! {:c "get" :m {:sid sid}})) (submit-message [_ {:keys [sid] :as params}] (send-signed! {:c "put" :m {:sid sid, :params (dissoc params :sid)}})) (claim-server [_ claim-key] (send-signed! {:c "claim" :m {:claim-key claim-key}})) (as [_ {:keys [public-key private-key public-signing-key] :as ks}] (cond ks (let [response (send-keys! public-key public-signing-key) server-pk-str (process-key-response server-pk response)] |
︙ | ︙ | |||
270 271 272 273 274 275 276 | (send-signed! {:c "reg", :m {:sid sid}})) (configure-stream [_ {:keys [sid] :as params}] (send-signed! {:c "cfg" :m {:sid sid, :params (dissoc params :sid)}})) | | | < < < | 278 279 280 281 282 283 284 285 286 287 288 289 290 | (send-signed! {:c "reg", :m {:sid sid}})) (configure-stream [_ {:keys [sid] :as params}] (send-signed! {:c "cfg" :m {:sid sid, :params (dissoc params :sid)}})) (list-streams [_ params] (send-signed! {:c "list" :m params})) (close [_] {:result (close!)}) (is-open? [_] {:result (is-open?)})))) |
Changes to src/streamful/core.clj.
︙ | ︙ | |||
93 94 95 96 97 98 99 100 101 102 103 104 105 106 | (defn wrap-stream-model [db-stream-model] (reify stream/StreamModel (has-owner? [_] (stream/has-owner? db-stream-model)) (claim! [_ pk] (stream/claim! db-stream-model pk)) (register-stream! [_, pk, msg, original-msg-bytes] (stream/register-stream! db-stream-model pk msg original-msg-bytes)) (configure-stream! [_, pk, msg, original-msg-bytes] (stream/configure-stream! db-stream-model pk msg original-msg-bytes)) (get-messages! [_, pk, {{:strs [sid]} "m" :as msg}, original-msg-bytes] (if (= "COPYRIGHT" sid) {:status :ok :messages [{"m" copyright-text}]} (stream/get-messages! db-stream-model pk msg original-msg-bytes))) | > > | 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | (defn wrap-stream-model [db-stream-model] (reify stream/StreamModel (has-owner? [_] (stream/has-owner? db-stream-model)) (claim! [_ pk] (stream/claim! db-stream-model pk)) (register-stream! [_, pk, msg, original-msg-bytes] (stream/register-stream! db-stream-model pk msg original-msg-bytes)) (list-streams [_, pk, msg] (stream/list-streams db-stream-model pk msg)) (configure-stream! [_, pk, msg, original-msg-bytes] (stream/configure-stream! db-stream-model pk msg original-msg-bytes)) (get-messages! [_, pk, {{:strs [sid]} "m" :as msg}, original-msg-bytes] (if (= "COPYRIGHT" sid) {:status :ok :messages [{"m" copyright-text}]} (stream/get-messages! db-stream-model pk msg original-msg-bytes))) |
︙ | ︙ |
Changes to src/streamful/protocol.clj.
︙ | ︙ | |||
113 114 115 116 117 118 119 120 121 122 123 124 125 126 | (defn- router [stream-model claim-key] (fn [{:keys [psk original] {:keys [remote-ip session-id]} :req} {:strs [c m] :as cmd}] (case c "ping" m "get" (get-messages psk stream-model cmd original) "put" (put-message! psk stream-model cmd original) "claim" (claim-server! psk stream-model claim-key cmd) "reg" (register-stream! psk stream-model cmd original) "cfg" (configure-stream! psk stream-model cmd original) (throw (ex-info "Unknown command" {:cmd cmd :session-id session-id :remote-ip remote-ip}))))) | > | 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 | (defn- router [stream-model claim-key] (fn [{:keys [psk original] {:keys [remote-ip session-id]} :req} {:strs [c m] :as cmd}] (case c "ping" m "get" (get-messages psk stream-model cmd original) "put" (put-message! psk stream-model cmd original) "list" (stream/list-streams stream-model psk cmd) "claim" (claim-server! psk stream-model claim-key cmd) "reg" (register-stream! psk stream-model cmd original) "cfg" (configure-stream! psk stream-model cmd original) (throw (ex-info "Unknown command" {:cmd cmd :session-id session-id :remote-ip remote-ip}))))) |
︙ | ︙ |
Changes to src/streamful/stream.clj.
︙ | ︙ | |||
19 20 21 22 23 24 25 26 27 28 29 30 31 | (ns streamful.stream) (defprotocol StreamModel (has-owner? [_]) (claim! [_, ^bytes pk]) (register-stream! [_, ^bytes pk, msg, ^bytes original-msg-bytes]) (configure-stream! [_, ^bytes pk, msg, ^bytes original-msg-bytes]) (get-messages! [_, ^bytes pk, msg, ^bytes original-msg-bytes] "Get messages from a stream. May be part of a signed request. May cause state to change in the backend as we keep track of who has subscribed to our stream.") (put-message! [_, ^bytes pk, msg, ^bytes original-msg-bytes] "Put a message into a stream. Must be part of a signed request.")) | > | 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | (ns streamful.stream) (defprotocol StreamModel (has-owner? [_]) (claim! [_, ^bytes pk]) (register-stream! [_, ^bytes pk, msg, ^bytes original-msg-bytes]) (configure-stream! [_, ^bytes pk, msg, ^bytes original-msg-bytes]) (list-streams [_, ^bytes pk, msg]) (get-messages! [_, ^bytes pk, msg, ^bytes original-msg-bytes] "Get messages from a stream. May be part of a signed request. May cause state to change in the backend as we keep track of who has subscribed to our stream.") (put-message! [_, ^bytes pk, msg, ^bytes original-msg-bytes] "Put a message into a stream. Must be part of a signed request.")) |
Changes to src/streamful/stream_datalevin.clj.
︙ | ︙ | |||
84 85 86 87 88 89 90 | (let [internal-id (cr/guid) smsg (stream-put-msg internal-id msg client-psk original-msg-bytes) first-msg-time (-> smsg (nth 3) :t)] (db/transact-kv tx | | | | | | | | | 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | (let [internal-id (cr/guid) smsg (stream-put-msg internal-id msg client-psk original-msg-bytes) first-msg-time (-> smsg (nth 3) :t)] (db/transact-kv tx [[:put stream-agg-table :manifest (merge manifest {sid {"psk" client-psk "iid" internal-id "f" first-msg-time}})] smsg]) :ok) (not root-exists?) :root-stream-missing existing-stream :already-exists :else :substreams-not-allowed))) |
︙ | ︙ | |||
128 129 130 131 132 133 134 135 136 137 138 139 140 141 | (not existing-stream) :stream-missing :else :unauthorized))) (catch Throwable t (log/error t) :database-error))) (defn get-messages! [db client-psk {{:strs [sid]} "m"} _original-msg-bytes] (let [manifest (get-db-manifest db) {:strs [iid f psk] | > > > > > > > > | 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | (not existing-stream) :stream-missing :else :unauthorized))) (catch Throwable t (log/error t) :database-error))) (defn- list-streams [db client-psk _msg] (let [manifest (get-db-manifest db)] (->> manifest (filter (fn [[_ {:strs [psk] {:strs [anon-allowed?]} "c"}]] (or anon-allowed? (creq/eq? client-psk psk)))) (map (fn [[k]] k))))) (defn get-messages! [db client-psk {{:strs [sid]} "m"} _original-msg-bytes] (let [manifest (get-db-manifest db) {:strs [iid f psk] |
︙ | ︙ | |||
181 182 183 184 185 186 187 188 189 190 191 | (reify stream/StreamModel (has-owner? [_] (get-db-owner db)) (claim! [_ pk] (claim-server! db pk)) (register-stream! [_, pk, msg, original-msg-bytes] (register-stream! db pk msg original-msg-bytes)) (configure-stream! [_, pk, msg, original-msg-bytes] (configure-stream! db pk msg original-msg-bytes)) (get-messages! [_, pk, msg, original-msg-bytes] (get-messages! db pk msg original-msg-bytes)) (put-message! [_, pk, msg, original-msg-bytes] (put-message! db pk msg original-msg-bytes))))) | > | 189 190 191 192 193 194 195 196 197 198 199 200 | (reify stream/StreamModel (has-owner? [_] (get-db-owner db)) (claim! [_ pk] (claim-server! db pk)) (register-stream! [_, pk, msg, original-msg-bytes] (register-stream! db pk msg original-msg-bytes)) (configure-stream! [_, pk, msg, original-msg-bytes] (configure-stream! db pk msg original-msg-bytes)) (list-streams [_, pk, msg] (list-streams db pk msg)) (get-messages! [_, pk, msg, original-msg-bytes] (get-messages! db pk msg original-msg-bytes)) (put-message! [_, pk, msg, original-msg-bytes] (put-message! db pk msg original-msg-bytes))))) |
Changes to test/streamful/manual_testing.clj.
︙ | ︙ | |||
25 26 27 28 29 30 31 | [taoensso.timbre :as log]) (:import (java.util.concurrent Executors))) ;; this appears at server startup ;; eventually we will use a TXT record hosted using DNSSEC ;; to prove servers can be trusted (no SSL authorities needed!) | | | | 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | [taoensso.timbre :as log]) (:import (java.util.concurrent Executors))) ;; this appears at server startup ;; eventually we will use a TXT record hosted using DNSSEC ;; to prove servers can be trusted (no SSL authorities needed!) (def ^:private server-pk "9G4BzxYJSwGxC6a3FC3wqqa6Chk/cAV9lmgmyvyhgUA=") #_(def ^:private server-pk "J7wkkTsgL/javpyzaq+PdNhn7umlcGT4Xvds9jq16Xs=") (defn- test-a-bunch [] (with-open [contained-client (client/new-client "localhost" 1100 server-pk 1000)] (let [answers (->> (range 25) |
︙ | ︙ | |||
102 103 104 105 106 107 108 109 110 111 112 113 114 115 | (client/claim-server c "v0CA_kT8RQ_IrbEgF9SltA") (client/register-stream c {:sid "root"}) (client/configure-stream c {:sid "root" :substreams-allowed? true}) (client/configure-stream c {:sid "root" :anon-allowed? true}) (client/register-stream c {:sid "personal"}) (client/register-stream c {:sid "public"}) (client/configure-stream c {:sid "public" :anon-allowed? true}) (client/get-messages c {:sid "root"}) (client/get-messages c2 {:sid "root"}) (client/get-messages c {:sid "personal"}) (client/get-messages c2 {:sid "personal"}) (client/get-messages c {:sid "public"}) (client/get-messages c2 {:sid "public"}) | > > | 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 | (client/claim-server c "v0CA_kT8RQ_IrbEgF9SltA") (client/register-stream c {:sid "root"}) (client/configure-stream c {:sid "root" :substreams-allowed? true}) (client/configure-stream c {:sid "root" :anon-allowed? true}) (client/register-stream c {:sid "personal"}) (client/register-stream c {:sid "public"}) (client/configure-stream c {:sid "public" :anon-allowed? true}) (client/list-streams c nil) (client/get-messages c {:sid "root"}) (client/get-messages c2 {:sid "root"}) (client/get-messages c {:sid "personal"}) (client/get-messages c2 {:sid "personal"}) (client/get-messages c {:sid "public"}) (client/get-messages c2 {:sid "public"}) |
︙ | ︙ |
Changes to test/streamful/protocol_test.clj.
︙ | ︙ | |||
293 294 295 296 297 298 299 | (count x) (pos? x))) (is (as-> (get-as client keys1 "test1") x (:response x) (x "status") (= "stream not found" x)))) (ta/pending "approval required") | | > > > > > > > > > > > > > | 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 | (count x) (pos? x))) (is (as-> (get-as client keys1 "test1") x (:response x) (x "status") (= "stream not found" x)))) (ta/pending "approval required") (ta/pending "invitation only") (testing "listing streams" (testing "i see my stuff, too" (as client keys1) (is (= {:response ["root" "test-post-anon"]} (client/list-streams client {}))) (as client keys2) (is (= {:response ["root" "test1"]} (client/list-streams client {})))) (testing "public stuff only for anonymous folks" (anon client) (is (= {:response ["root"]} (client/list-streams client {})))))))) (defn- reassemble-from-handler [{:keys [psk original]}] (decode-signed-original psk original)) (deftest handle-message-test (let [messages-received (atom []) |
︙ | ︙ |