Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | A TON of thinking about how aggregates could work |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA3-256: |
ea4d3ed020c368cdc830501053641afc |
User & Date: | scstarkey 2025-02-25 14:04:39 |
Context
2025-02-26
| ||
11:18 | Removed unnecessary complexity check-in: c756216233 user: scstarkey tags: trunk | |
2025-02-25
| ||
14:04 | A TON of thinking about how aggregates could work check-in: ea4d3ed020 user: scstarkey tags: trunk | |
2025-02-22
| ||
16:40 | Using lilliput now check-in: e3105c353c user: scstarkey tags: trunk | |
Changes
Changes to README.md.
︙ | ︙ | |||
84 85 86 87 88 89 90 91 92 93 94 | ``` In fact, if you want to make sure you haven't failed to stage something properly, you can use `./commit`, which will also check to make sure you haven't forgotten to stage any file adds/deletes. It will also ensure the podman image build works properly! ## License See `resources/copyright.txt` for the terms under which you can share this program. | > > > | 84 85 86 87 88 89 90 91 92 93 94 95 96 97 | ``` In fact, if you want to make sure you haven't failed to stage something properly, you can use `./commit`, which will also check to make sure you haven't forgotten to stage any file adds/deletes. It will also ensure the podman image build works properly! You can use `SKIP_PODMAN=true ./commit` to do the same thing but instead of building the podman image it will run `lein precommit` ## License See `resources/copyright.txt` for the terms under which you can share this program. |
Changes to commit.
︙ | ︙ | |||
19 20 21 22 23 24 25 26 27 | # UNSTAGED=$(fossil status --missing --extra | grep -E '(MISSING|EXTRA)') if [[ "" != "${UNSTAGED}" ]]; then echo "Unstaged files found: ${UNSTAGED}" exit 1 fi # shellcheck disable=SC2068 | > > > > > > | | 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | # UNSTAGED=$(fossil status --missing --extra | grep -E '(MISSING|EXTRA)') if [[ "" != "${UNSTAGED}" ]]; then echo "Unstaged files found: ${UNSTAGED}" exit 1 fi if [ "${SKIP_PODMAN}" != "true" ]; then bin/build-podman || exit 1 else lein precommit || exit 1 fi # shellcheck disable=SC2068 fossil commit $@ |
Changes to src/streamful/stream_datalevin.clj.
︙ | ︙ | |||
164 165 166 167 168 169 170 | original-msg-bytes] (let [manifest (get-db-manifest db) {:strs [iid] :as existing-stream} (and manifest (manifest sid))] (cond (and existing-stream (creq/eq? client-psk (existing-stream "psk"))) (db/with-transaction-kv [tx db] (let [msg (stream-put-msg iid msg client-psk original-msg-bytes)] | < < | | 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 | original-msg-bytes] (let [manifest (get-db-manifest db) {:strs [iid] :as existing-stream} (and manifest (manifest sid))] (cond (and existing-stream (creq/eq? client-psk (existing-stream "psk"))) (db/with-transaction-kv [tx db] (let [msg (stream-put-msg iid msg client-psk original-msg-bytes)] ;;todo consider running all appropriate aggregates immediately (db/transact-kv tx [msg]) :ok)) (not existing-stream) :stream-missing :else :unauthorized))) |
︙ | ︙ |
Changes to test/streamful/protocol_test.clj.
︙ | ︙ | |||
26 27 28 29 30 31 32 | [streamful.test-asserts :as ta] [streamful.test-cfg :as tcfg] [streamful.test-crypto :as tcrypto] [streamful.test-protocol :as tp] [streamful.test-protocol :refer :all] [streamful.time :as time] [streamful.transport :as transport]) | | > | 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | [streamful.test-asserts :as ta] [streamful.test-cfg :as tcfg] [streamful.test-crypto :as tcrypto] [streamful.test-protocol :as tp] [streamful.test-protocol :refer :all] [streamful.time :as time] [streamful.transport :as transport]) (:import (java.io ByteArrayInputStream) (java.nio ByteBuffer))) ; This protocol is inspired by ; <https://github.com/nostr-protocol/nips/blob/master/01.md> ; except for a few minor differences. Since we are signing an object ; which has been CBOR encoded, we can simply sign the original message ; itself instead of forcing a specific sequence. The signature ; is wrapped using libsodium's envelope, instead of a custom detached |
︙ | ︙ | |||
220 221 222 223 224 225 226 | (testing "publish simple messages" (as client keys1) (is (= {:response "ok"} (client/submit-message client {:sid "root" :t "hello, world!"}))) | | > | | | | | | > | | | | | > > | < < < | | | | | > | | | | < | | | > > > | > > | | | > > > | 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 | (testing "publish simple messages" (as client keys1) (is (= {:response "ok"} (client/submit-message client {:sid "root" :t "hello, world!"}))) (let [{{root-messages "messages"} :response} (get-as client keys1 "root")] (testing "can't publish to someone else's stream" (as client keys2) (is (= {:response "unauthorized"} (client/submit-message client {:sid "root" :t "hello, friend!"})))) (let [reply-to-id ((last root-messages) "id")] (is (= {:response "ok"} (client/submit-message client {:sid "test1" :t "hello, friend!" :tid reply-to-id :rid reply-to-id}))) (testing "messages show up" (let [{{test1-messages "messages"} :response} (get-as client keys2 "test1")] (is (= (conj expected-root-messages {"c" "put" "m" {"sid" "root", "params" {"t" "hello, world!"}} "k" root-k "ct" 19}) (map get-m root-messages))) (is (= (conj expected-test1-messages {"c" "put" "m" {"sid" "test1", "params" {"t" "hello, friend!" "tid" reply-to-id "rid" reply-to-id}} "k" test1-k "ct" 22}) (map get-m test1-messages))) ;; this is used in the below sample message #_(clojure.pprint/pprint (last test1-messages)) )))))))) (testing "authentication required" (anon client) (let [expected-response {:response "authentication required"}] (testing "registration" (is (= expected-response (client/register-stream client {:sid "test1"})))) |
︙ | ︙ | |||
497 498 499 500 501 502 503 504 505 | (is (= {:response "ok"} (client/register-stream client {:sid "root"}))) (is (= 1 (-> (get-messages-from client "root") :response (get "messages") count))))))) (deftest aggregates-test | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | > | 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 | (is (= {:response "ok"} (client/register-stream client {:sid "root"}))) (is (= 1 (-> (get-messages-from client "root") :response (get "messages") count))))))) (comment (let [sample-msg ;; this is the structure of a message as it exists today ;; when an aggregate is invoked, this is the shape of the message ;; that will be passed to it { ;; when received from server, ms since epoch "srt" 1740490760938, ;; public signing key of publisher "psk" [124, 34, 124, 11, -29, 1, 40, 107, -80, 44, -4, -5, -17, 85, -72, 34, 27, -128, -59, -124, -83, -81, 9, -59, 28, -87, 41, 68, 106, -82, 45, 95], ;; base64-encoded SHA-128 hash of the contents of the message below "id" "eP+wTB5uTGVxC09ENy9bvRuHBSbBOqZi7ASbYITc/rw=", ;; the full contents of the message that was originally ;; CBOR-encoded and signed using libsodium "m" { ;; when created on the client, ms since epoch ;; this would normally be a bigger number, but the above test has ;; time hard-coded for consistency of test assertions "ct" 22, ;; command -- what should subscribers do with this message? ;; `put` just means "put it in your subscription" "c" "put", ;; base64-encoded form of `:psk` above for verification of internal ;; consistency to prevent forgery "k" "fCJ8C+MBKGuwLPz771W4IhuAxYStrwnFHKkpRGquLV8=", ;; message itself "m" ;; arguments to the command {"params" { ;; what I am saying "t" "hello, friend!", ;; in reply to this message "rid" "1HSTQiUbbFLpJw3rn7nRyXGqNvYyIf7Alg1iwMP7mpw=", ;; which lives in this thread (in this case they are the same) ;; makes me wonder -- should we simply encode a complete path? ;; that way every ancestor gets "credit" for messages that live ;; below them? "tid" "1HSTQiUbbFLpJw3rn7nRyXGqNvYyIf7Alg1iwMP7mpw="}, ;; ID of the stream that the message belongs on "sid" "test1"}}, ;; the original CBOR-encoded, signed (libsodium) message in bytes ;; can be reassembled and verified by a client to ensure ;; the server didn't tamper with them ;; see `reassemble-original` above "o" [-83, -76, -84, -108, -105, 38, -21, 65, 23, -114, -27, -106, 117, -105, -125, -54, 122, 112, -54, -4, -44, -43, -59, -58, 4, -77, -39, 56, 89, -65, 20, 5, 25, -100, -28, -49, -47, 51, 46, 76, 80, -117, 41, 29, 86, -57, 64, 33, -125, -57, 108, 119, 123, 83, -116, -27, -9, 63, 84, -43, -3, 43, 117, 14, -92, 97, 99, 99, 112, 117, 116, 97, 109, -94, 99, 115, 105, 100, 101, 116, 101, 115, 116, 49, 102, 112, 97, 114, 97, 109, 115, -93, 97, 116, 110, 104, 101, 108, 108, 111, 44, 32, 102, 114, 105, 101, 110, 100, 33, 99, 116, 105, 100, 120, 44, 49, 72, 83, 84, 81, 105, 85, 98, 98, 70, 76, 112, 74, 119, 51, 114, 110, 55, 110, 82, 121, 88, 71, 113, 78, 118, 89, 121, 73, 102, 55, 65, 108, 103, 49, 105, 119, 77, 80, 55, 109, 112, 119, 61, 99, 114, 105, 100, 120, 44, 49, 72, 83, 84, 81, 105, 85, 98, 98, 70, 76, 112, 74, 119, 51, 114, 110, 55, 110, 82, 121, 88, 71, 113, 78, 118, 89, 121, 73, 102, 55, 65, 108, 103, 49, 105, 119, 77, 80, 55, 109, 112, 119, 61, 98, 99, 116, 22, 97, 107, 120, 44, 102, 67, 74, 56, 67, 43, 77, 66, 75, 71, 117, 119, 76, 80, 122, 55, 55, 49, 87, 52, 73, 104, 117, 65, 120, 89, 83, 116, 114, 119, 110, 70, 72, 75, 107, 112, 82, 71, 113, 117, 76, 86, 56, 61]}] ;; a proof that the byte array above converts cleanly ;; you could run this in a repl! (-> sample-msg ;; crypto library only works with bytebuffers (update "o" (fn [byte-vec] (ByteBuffer/wrap (byte-array byte-vec)))) ;; our code assumes this will be a byte array (update "psk" byte-array) ;; only pass in the keys we need (select-keys ["psk" "id" "o"]) ;; performs all the validations and disassemblies reassemble-original ;; should be identical to the outer `"m"` above clojure.pprint/pprint)) ) (deftest aggregates-test (testing "register aggregate" (let [ ;; used to find the message that contains the aggregate value _aggregate-selector-fn (fn [ ;; message to add to aggregate msg] ;; id of the aggregated message (->> msg :m :m :params :tid)) ;; aggregates threads / replies _thread-aggregator-fn (fn [ ;; aggregate container agg ;; message to add to aggregate msg] ;; the new value of the aggregate to be placed in the db (-> agg (update :thm (fn [thread-messages] (conj (or thread-messages []) (:id msg)))) (update :thc (fn [thc] (inc (or thc 0))))))] #_(-> {} (_thread-aggregator-fn {:id "abc"}) (_thread-aggregator-fn {:id "def"}) pr-str clojure.pprint/pprint) (ta/pending "message aggregate")) (ta/pending "stream aggregate") (ta/pending "server aggregate")) (ta/pending "replace") (ta/pending "unregister")) (deftest server-side-subscriptions-test (testing "subscribe" (testing "same server" (ta/pending "updates destination properly")) |
︙ | ︙ |