Streamful

Check-in [ea4d3ed020]
Login

Check-in [ea4d3ed020]

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:A TON of thinking about how aggregates could work
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: ea4d3ed020c368cdc830501053641afcae1a52bd15aee795f97c8023358a59fc
User & Date: scstarkey 2025-02-25 14:04:39
Context
2025-02-26
11:18
Removed unnecessary complexity check-in: c756216233 user: scstarkey tags: trunk
2025-02-25
14:04
A TON of thinking about how aggregates could work check-in: ea4d3ed020 user: scstarkey tags: trunk
2025-02-22
16:40
Using lilliput now check-in: e3105c353c user: scstarkey tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to README.md.

84
85
86
87
88
89
90



91
92
93
94
```

In fact, if you want to make sure you haven't failed to stage something
properly, you can use `./commit`, which will also check to make sure
you haven't forgotten to stage any file adds/deletes. It will also
ensure the podman image build works properly!




## License

See `resources/copyright.txt` for the terms under which you can share
this program.







>
>
>




84
85
86
87
88
89
90
91
92
93
94
95
96
97
```

In fact, if you want to make sure you haven't failed to stage something
properly, you can use `./commit`, which will also check to make sure
you haven't forgotten to stage any file adds/deletes. It will also
ensure the podman image build works properly!

You can use `SKIP_PODMAN=true ./commit` to do the same thing but
instead of building the podman image it will run `lein precommit`

## License

See `resources/copyright.txt` for the terms under which you can share
this program.

Changes to commit.

19
20
21
22
23
24
25
26






27
28
#

UNSTAGED=$(fossil status --missing --extra | grep -E '(MISSING|EXTRA)')
if [[ "" != "${UNSTAGED}" ]]; then
  echo "Unstaged files found: ${UNSTAGED}"
  exit 1
fi







# shellcheck disable=SC2068
bin/build-podman && fossil commit $@








>
>
>
>
>
>

|
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#

UNSTAGED=$(fossil status --missing --extra | grep -E '(MISSING|EXTRA)')
if [[ "" != "${UNSTAGED}" ]]; then
  echo "Unstaged files found: ${UNSTAGED}"
  exit 1
fi

if [ "${SKIP_PODMAN}" != "true" ]; then
  bin/build-podman || exit 1
else
  lein precommit || exit 1
fi

# shellcheck disable=SC2068
fossil commit $@

Changes to src/streamful/stream_datalevin.clj.

164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
                    original-msg-bytes]
  (let [manifest (get-db-manifest db)
        {:strs [iid] :as existing-stream} (and manifest (manifest sid))]
    (cond
      (and existing-stream (creq/eq? client-psk (existing-stream "psk")))
      (db/with-transaction-kv [tx db]
        (let [msg (stream-put-msg iid msg client-psk original-msg-bytes)]
          ;;todo also add a message that adds this message
          ;;todo to the aggregates-need-to-run list for this message's
          ;;todo stream and its parent stream(s)
          (db/transact-kv tx [msg])
          :ok))

      (not existing-stream) :stream-missing

      :else :unauthorized)))








<
<
|







164
165
166
167
168
169
170


171
172
173
174
175
176
177
178
                    original-msg-bytes]
  (let [manifest (get-db-manifest db)
        {:strs [iid] :as existing-stream} (and manifest (manifest sid))]
    (cond
      (and existing-stream (creq/eq? client-psk (existing-stream "psk")))
      (db/with-transaction-kv [tx db]
        (let [msg (stream-put-msg iid msg client-psk original-msg-bytes)]


          ;;todo consider running all appropriate aggregates immediately
          (db/transact-kv tx [msg])
          :ok))

      (not existing-stream) :stream-missing

      :else :unauthorized)))

Changes to test/streamful/protocol_test.clj.

26
27
28
29
30
31
32
33

34
35
36
37
38
39
40
            [streamful.test-asserts :as ta]
            [streamful.test-cfg :as tcfg]
            [streamful.test-crypto :as tcrypto]
            [streamful.test-protocol :as tp]
            [streamful.test-protocol :refer :all]
            [streamful.time :as time]
            [streamful.transport :as transport])
  (:import (java.io ByteArrayInputStream)))


; This protocol is inspired by
; <https://github.com/nostr-protocol/nips/blob/master/01.md>
; except for a few minor differences. Since we are signing an object
; which has been CBOR encoded, we can simply sign the original message
; itself instead of forcing a specific sequence. The signature
; is wrapped using libsodium's envelope, instead of a custom detached







|
>







26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
            [streamful.test-asserts :as ta]
            [streamful.test-cfg :as tcfg]
            [streamful.test-crypto :as tcrypto]
            [streamful.test-protocol :as tp]
            [streamful.test-protocol :refer :all]
            [streamful.time :as time]
            [streamful.transport :as transport])
  (:import (java.io ByteArrayInputStream)
           (java.nio ByteBuffer)))

; This protocol is inspired by
; <https://github.com/nostr-protocol/nips/blob/master/01.md>
; except for a few minor differences. Since we are signing an object
; which has been CBOR encoded, we can simply sign the original message
; itself instead of forcing a specific sequence. The signature
; is wrapped using libsodium's envelope, instead of a custom detached
220
221
222
223
224
225
226
227

228
229
230
231
232
233

234
235
236
237
238


239
240
241
242
243
244
245
246
247

248
249
250
251
252
253
254
255



256


257
258
259



260
261
262
263
264
265
266

            (testing "publish simple messages"
              (as client keys1)
              (is (= {:response "ok"}
                     (client/submit-message
                       client
                       {:sid "root" :t "hello, world!"})))


              (testing "can't publish to someone else's stream"
                (as client keys2)
                (is (= {:response "unauthorized"}
                       (client/submit-message
                         client
                         {:sid "root" :t "hello, friend!"})))

                (is (= {:response "ok"}
                       (client/submit-message
                         client
                         {:sid "test1" :t "hello, friend!"}))))



              (testing "messages show up"
                (let [{{root-messages "messages"} :response}
                      (get-as client keys1 "root")

                      {{test1-messages "messages"} :response}
                      (get-as client keys2 "test1")]
                  (is (= (conj
                           expected-root-messages
                           {"c" "put"

                            "m" {"sid" "root", "params" {"t" "hello, world!"}}
                            "k" root-k
                            "ct" 19})
                         (map get-m root-messages)))

                  (is (= (conj
                           expected-test1-messages
                           {"c" "put"



                            "m" {"sid" "test1", "params" {"t" "hello, friend!"}}


                            "k" test1-k
                            "ct" 21})
                         (map get-m test1-messages)))))))))




      (testing "authentication required"
        (anon client)
        (let [expected-response {:response "authentication required"}]
          (testing "registration"
            (is (= expected-response
                   (client/register-stream client {:sid "test1"}))))







|
>
|
|
|
|
|
|
>
|
|
|
|
|
>
>
|
<
<
<
|
|
|
|
|
>
|
|
|
|
<
|
|
|
>
>
>
|
>
>
|
|
|
>
>
>







221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244



245
246
247
248
249
250
251
252
253
254

255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276

            (testing "publish simple messages"
              (as client keys1)
              (is (= {:response "ok"}
                     (client/submit-message
                       client
                       {:sid "root" :t "hello, world!"})))
              (let [{{root-messages "messages"} :response}
                    (get-as client keys1 "root")]
                (testing "can't publish to someone else's stream"
                  (as client keys2)
                  (is (= {:response "unauthorized"}
                         (client/submit-message
                           client
                           {:sid "root" :t "hello, friend!"}))))
                (let [reply-to-id ((last root-messages) "id")]
                  (is (= {:response "ok"}
                         (client/submit-message
                           client
                           {:sid "test1"
                            :t "hello, friend!"
                            :tid reply-to-id
                            :rid reply-to-id})))
                  (testing "messages show up"



                    (let [{{test1-messages "messages"} :response}
                          (get-as client keys2 "test1")]
                      (is (= (conj
                               expected-root-messages
                               {"c" "put"
                                "m"
                                {"sid" "root", "params" {"t" "hello, world!"}}
                                "k" root-k
                                "ct" 19})
                             (map get-m root-messages)))

                      (is (= (conj
                               expected-test1-messages
                               {"c" "put"
                                "m"
                                {"sid" "test1",
                                 "params"
                                 {"t" "hello, friend!"
                                  "tid" reply-to-id
                                  "rid" reply-to-id}}
                                "k" test1-k
                                "ct" 22})
                             (map get-m test1-messages)))
                      ;; this is used in the below sample message
                      #_(clojure.pprint/pprint (last test1-messages))
                      ))))))))

      (testing "authentication required"
        (anon client)
        (let [expected-response {:response "authentication required"}]
          (testing "registration"
            (is (= expected-response
                   (client/register-stream client {:sid "test1"}))))
497
498
499
500
501
502
503
504




































































































505




































506

507
508
509
510
511
512
513
        (is (= {:response "ok"}
               (client/register-stream client {:sid "root"})))
        (is (= 1
               (-> (get-messages-from client "root")
                   :response
                   (get "messages")
                   count)))))))





































































































(deftest aggregates-test




































  (ta/pending "register")

  (ta/pending "replace")
  (ta/pending "unregister"))

(deftest server-side-subscriptions-test
  (testing "subscribe"
    (testing "same server"
      (ta/pending "updates destination properly"))








>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>







507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
        (is (= {:response "ok"}
               (client/register-stream client {:sid "root"})))
        (is (= 1
               (-> (get-messages-from client "root")
                   :response
                   (get "messages")
                   count)))))))

(comment
  (let [sample-msg
        ;; this is the structure of a message as it exists today
        ;; when an aggregate is invoked, this is the shape of the message
        ;; that will be passed to it
        {
         ;; when received from server, ms since epoch
         "srt" 1740490760938,

         ;; public signing key of publisher
         "psk"
         [124, 34, 124, 11, -29, 1, 40, 107, -80, 44, -4, -5, -17, 85, -72, 34,
          27, -128, -59, -124, -83, -81, 9, -59, 28, -87, 41, 68, 106, -82, 45,
          95],

         ;; base64-encoded SHA-128 hash of the contents of the message below
         "id" "eP+wTB5uTGVxC09ENy9bvRuHBSbBOqZi7ASbYITc/rw=",

         ;; the full contents of the message that was originally
         ;; CBOR-encoded and signed using libsodium
         "m"
         {
          ;; when created on the client, ms since epoch
          ;; this would normally be a bigger number, but the above test has
          ;; time hard-coded for consistency of test assertions
          "ct" 22,

          ;; command -- what should subscribers do with this message?
          ;; `put` just means "put it in your subscription"
          "c" "put",

          ;; base64-encoded form of `:psk` above for verification of internal
          ;; consistency to prevent forgery
          "k" "fCJ8C+MBKGuwLPz771W4IhuAxYStrwnFHKkpRGquLV8=",

          ;; message itself
          "m"

          ;; arguments to the command
          {"params"
           {
            ;; what I am saying
            "t" "hello, friend!",

            ;; in reply to this message
            "rid" "1HSTQiUbbFLpJw3rn7nRyXGqNvYyIf7Alg1iwMP7mpw=",

            ;; which lives in this thread (in this case they are the same)
            ;; makes me wonder -- should we simply encode a complete path?
            ;; that way every ancestor gets "credit" for messages that live
            ;; below them?
            "tid" "1HSTQiUbbFLpJw3rn7nRyXGqNvYyIf7Alg1iwMP7mpw="},

           ;; ID of the stream that the message belongs on
           "sid" "test1"}},

         ;; the original CBOR-encoded, signed (libsodium) message in bytes
         ;; can be reassembled and verified by a client to ensure
         ;; the server didn't tamper with them
         ;; see `reassemble-original` above
         "o"
         [-83, -76, -84, -108, -105, 38, -21, 65, 23, -114, -27, -106, 117,
          -105, -125, -54, 122, 112, -54, -4, -44, -43, -59, -58, 4, -77, -39,
          56, 89, -65, 20, 5, 25, -100, -28, -49, -47, 51, 46, 76, 80, -117,
          41, 29, 86, -57, 64, 33, -125, -57, 108, 119, 123, 83, -116, -27, -9,
          63, 84, -43, -3, 43, 117, 14, -92, 97, 99, 99, 112, 117, 116, 97,
          109, -94, 99, 115, 105, 100, 101, 116, 101, 115, 116, 49, 102, 112,
          97, 114, 97, 109, 115, -93, 97, 116, 110, 104, 101, 108, 108, 111,
          44, 32, 102, 114, 105, 101, 110, 100, 33, 99, 116, 105, 100, 120, 44,
          49, 72, 83, 84, 81, 105, 85, 98, 98, 70, 76, 112, 74, 119, 51, 114,
          110, 55, 110, 82, 121, 88, 71, 113, 78, 118, 89, 121, 73, 102, 55,
          65, 108, 103, 49, 105, 119, 77, 80, 55, 109, 112, 119, 61, 99, 114,
          105, 100, 120, 44, 49, 72, 83, 84, 81, 105, 85, 98, 98, 70, 76, 112,
          74, 119, 51, 114, 110, 55, 110, 82, 121, 88, 71, 113, 78, 118, 89,
          121, 73, 102, 55, 65, 108, 103, 49, 105, 119, 77, 80, 55, 109, 112,
          119, 61, 98, 99, 116, 22, 97, 107, 120, 44, 102, 67, 74, 56, 67, 43,
          77, 66, 75, 71, 117, 119, 76, 80, 122, 55, 55, 49, 87, 52, 73, 104,
          117, 65, 120, 89, 83, 116, 114, 119, 110, 70, 72, 75, 107, 112, 82,
          71, 113, 117, 76, 86, 56, 61]}]

    ;; a proof that the byte array above converts cleanly
    ;; you could run this in a repl!
    (-> sample-msg

        ;; crypto library only works with bytebuffers
        (update "o" (fn [byte-vec] (ByteBuffer/wrap (byte-array byte-vec))))

        ;; our code assumes this will be a byte array
        (update "psk" byte-array)

        ;; only pass in the keys we need
        (select-keys ["psk" "id" "o"])

        ;; performs all the validations and disassemblies
        reassemble-original

        ;; should be identical to the outer `"m"` above
        clojure.pprint/pprint))
  )

(deftest aggregates-test
  (testing "register aggregate"
    (let [
          ;; used to find the message that contains the aggregate value
          _aggregate-selector-fn
          (fn [
               ;; message to add to aggregate
               msg]

            ;; id of the aggregated message
            (->> msg :m :m :params :tid))

          ;; aggregates threads / replies
          _thread-aggregator-fn
          (fn [
               ;; aggregate container
               agg

               ;; message to add to aggregate
               msg]

            ;; the new value of the aggregate to be placed in the db
            (-> agg
                (update
                  :thm
                  (fn [thread-messages]
                    (conj (or thread-messages []) (:id msg))))
                (update
                  :thc
                  (fn [thc]
                    (inc (or thc 0))))))]
      #_(-> {}
            (_thread-aggregator-fn {:id "abc"})
            (_thread-aggregator-fn {:id "def"})
            pr-str
            clojure.pprint/pprint)
      (ta/pending "message aggregate"))
    (ta/pending "stream aggregate")
    (ta/pending "server aggregate"))
  (ta/pending "replace")
  (ta/pending "unregister"))

(deftest server-side-subscriptions-test
  (testing "subscribe"
    (testing "same server"
      (ta/pending "updates destination properly"))