Streamful

Check-in [627799c4ba]
Login

Check-in [627799c4ba]

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Aggregators reloaded! And validated manually
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: 627799c4bae22bb99a31eb05f62a08f9bf6b950045203c731dfb1f720a839ce6
User & Date: scstarkey 2025-03-13 13:14:17
Context
2025-03-13
13:29
Fetch a bunch of messages by SHA (to see a whole thread, etc) check-in: 45ef5a3872 user: scstarkey tags: trunk
13:14
Aggregators reloaded! And validated manually check-in: 627799c4ba user: scstarkey tags: trunk
2025-03-12
13:57
Breadcrumbs check-in: 147f08bb76 user: scstarkey tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/streamful/aggregates.clj.

15
16
17
18
19
20
21
22

23
24
25
26
27
28
29
30
31
32
33
34
35
36
; License along with streamful.
; If not, see <https://www.gnu.org/licenses/#AGPL>.
;

(ns streamful.aggregates
  (:require [clojure.walk :as walk]
            [sci.core :as sci]
            [streamful.stream :as stream])

  (:import (java.util.concurrent.locks ReentrantReadWriteLock)))

(defprotocol AggregationSystem
  (stage-aggregate [_ params]
    "Prepare a new aggregate for introduction into the system. Compiles
     and places the function bundle into a pending state.")
  (remove-aggregate [_ params]
    "Prepares an aggregate for removal from the system. Immediately removes
     from the active list and moves it into the cleanup list. Actual cleanup
     happens on commit.")
  (rollback [_]
    "Removes all pending aggregates.")
  (commit [_]
    "For each pending aggregate, applies it to all stream messages and then







|
>



|


|







15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
; License along with streamful.
; If not, see <https://www.gnu.org/licenses/#AGPL>.
;

(ns streamful.aggregates
  (:require [clojure.walk :as walk]
            [sci.core :as sci]
            [streamful.stream :as stream]
            [taoensso.timbre :as log])
  (:import (java.util.concurrent.locks ReentrantReadWriteLock)))

(defprotocol AggregationSystem
  (stage-aggregator [_ params]
    "Prepare a new aggregate for introduction into the system. Compiles
     and places the function bundle into a pending state.")
  (remove-aggregator [_ params]
    "Prepares an aggregate for removal from the system. Immediately removes
     from the active list and moves it into the cleanup list. Actual cleanup
     happens on commit.")
  (rollback [_]
    "Removes all pending aggregates.")
  (commit [_]
    "For each pending aggregate, applies it to all stream messages and then
46
47
48
49
50
51
52
53
54
55
56
57



58
59
60
61
62
63
64
  '[fn fn* -> ->> update conj and or inc dec assoc dissoc])
(def ^:private agg-opts {:allow agg-allowed-symbols
                         :namespaces {
                                      ;;'clojure.core {'println println}
                                      }})
(defn- compile-str [s] (sci/eval-string s agg-opts))

(defn- stage-aggregate! [aggregators t n s a c]
  (let [s (compile-str s)
        a (compile-str a)
        c (compile-str c)]
    (swap! aggregators assoc-in [:pending t n] {:s s, :a a, :c c})))




(defmacro with-read! [& body]
  `(try
     (-> agg-lock .readLock .lock)
     ~@body
     (finally (-> agg-lock .readLock .unlock))))








|



|
>
>
>







47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
  '[fn fn* -> ->> update conj and or inc dec assoc dissoc])
(def ^:private agg-opts {:allow agg-allowed-symbols
                         :namespaces {
                                      ;;'clojure.core {'println println}
                                      }})
(defn- compile-str [s] (sci/eval-string s agg-opts))

(defn- install-aggregator! [aggregators status t n s a c]
  (let [s (compile-str s)
        a (compile-str a)
        c (compile-str c)]
    (swap! aggregators assoc-in [status t n] {:s s, :a a, :c c})))

(defn- stage-aggregator! [aggregators t n s a c]
  (install-aggregator! aggregators :pending t n s a c))

(defmacro with-read! [& body]
  `(try
     (-> agg-lock .readLock .lock)
     ~@body
     (finally (-> agg-lock .readLock .unlock))))

131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
















153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
          (partial apply-cleanup! stream-model prev-agg-fns)))

      (process-streams!
        stream-model
        (partial apply-aggregate! stream-model agg-fns))

      (move-aggregate! aggregators "msg" k :pending :active)
      (stream/move-aggregate! stream-model "msg" k :pending :active))

    (doseq [[k agg-fns]
            (get-in @aggregators [:cleanup "msg"])]
      (process-streams!
        stream-model
        (partial apply-cleanup! stream-model agg-fns))

      (swap! aggregators update-in [:cleanup "msg"] dissoc k)
      (stream/move-aggregate! stream-model "msg" k :cleanup nil))))

(defn- execute! [aggregators stream-model msg]
  (with-read!
    (doseq [[_ agg-fns] (get-in @aggregators [:active "msg"])]
      (apply-aggregate! stream-model agg-fns msg))))

















(defn base-aggregation [stream-model]
  (let [aggregators (atom {})]
    ;;todo LOAD AGGREGATES
    ;;todo for now let the failures just stick around and warn on startup
    ;;todo that they exist -- the only aggregates that should be in the
    ;;todo db on startup are active ones
    (reify AggregationSystem
      (stage-aggregate [_ {:keys [t n s a c]}]
        (stage-aggregate! aggregators t n s a c))
      (remove-aggregate [_ {:keys [t n]}]
        (move-aggregate! aggregators t n :active :cleanup))
      (rollback [_] (with-write! (swap! aggregators dissoc :pending :cleanup)))
      (commit [_] (commit! stream-model aggregators))
      (execute [_ msg] (execute! aggregators stream-model msg)))))







|








|





>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>



<
<
|
<

|
|
|




135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175


176

177
178
179
180
181
182
183
184
          (partial apply-cleanup! stream-model prev-agg-fns)))

      (process-streams!
        stream-model
        (partial apply-aggregate! stream-model agg-fns))

      (move-aggregate! aggregators "msg" k :pending :active)
      (stream/move-aggregator! stream-model "msg" k :pending :active))

    (doseq [[k agg-fns]
            (get-in @aggregators [:cleanup "msg"])]
      (process-streams!
        stream-model
        (partial apply-cleanup! stream-model agg-fns))

      (swap! aggregators update-in [:cleanup "msg"] dissoc k)
      (stream/move-aggregator! stream-model "msg" k :cleanup nil))))

(defn- execute! [aggregators stream-model msg]
  (with-read!
    (doseq [[_ agg-fns] (get-in @aggregators [:active "msg"])]
      (apply-aggregate! stream-model agg-fns msg))))

(defn load-aggregators! [stream-model aggregators]
  (log/info "Loading aggregators")
  (let [db-aggregators (stream/get-aggregators stream-model)]
    (doseq [status (keys db-aggregators)
            t (keys (status db-aggregators))
            [n {:keys [a s c]} :as agg] (get-in db-aggregators [status t])]
      (if (= :active status)
        (do
          (log/infof "  %s %s" t n)
          (install-aggregator! aggregators :active t n s a c))
        (log/error "Unexpected status -- you may want to redo!"
                   {:status status
                    :t t
                    :agg agg}))))
  (log/info "Aggregators loaded"))

(defn base-aggregation [stream-model]
  (let [aggregators (atom {})]


    (load-aggregators! stream-model aggregators)

    (reify AggregationSystem
      (stage-aggregator [_ {:keys [t n s a c]}]
        (stage-aggregator! aggregators t n s a c))
      (remove-aggregator [_ {:keys [t n]}]
        (move-aggregate! aggregators t n :active :cleanup))
      (rollback [_] (with-write! (swap! aggregators dissoc :pending :cleanup)))
      (commit [_] (commit! stream-model aggregators))
      (execute [_ msg] (execute! aggregators stream-model msg)))))

Changes to src/streamful/core.clj.

113
114
115
116
117
118
119


120
121
122
123
124
125
126
127
128
      (stream/get-messages-for-aggregation db-stream-model sid start-at))
    (get-message-by-sha [_ sha]
      (stream/get-message-by-sha db-stream-model sha))
    (put-message! [_, pk, msg, original-msg-bytes]
      (stream/put-message! db-stream-model pk msg original-msg-bytes))
    (put-aggregate! [_ iid agg-value]
      (stream/put-aggregate! db-stream-model iid agg-value))


    (move-aggregate! [_ t k old-status new-status]
      (stream/move-aggregate! db-stream-model t k old-status new-status))))

(defn- notify-claim-key [notifier claim-key]
  (notifier
    (format
      (str "** This server is not claimed. Only the server owner can "
           "configure it. Claim the server and become its owner "
           "using this key: '%s' **")







>
>
|
|







113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
      (stream/get-messages-for-aggregation db-stream-model sid start-at))
    (get-message-by-sha [_ sha]
      (stream/get-message-by-sha db-stream-model sha))
    (put-message! [_, pk, msg, original-msg-bytes]
      (stream/put-message! db-stream-model pk msg original-msg-bytes))
    (put-aggregate! [_ iid agg-value]
      (stream/put-aggregate! db-stream-model iid agg-value))
    (get-aggregators [_]
      (stream/get-aggregators db-stream-model))
    (move-aggregator! [_ t k old-status new-status]
      (stream/move-aggregator! db-stream-model t k old-status new-status))))

(defn- notify-claim-key [notifier claim-key]
  (notifier
    (format
      (str "** This server is not claimed. Only the server owner can "
           "configure it. Claim the server and become its owner "
           "using this key: '%s' **")

Changes to src/streamful/protocol.clj.

82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
(defn- register-aggregate! [agg-system
                            stream-model
                            psk
                            msg
                            original
                            params]
  (update-aggregate! agg-system stream-model psk msg original
    (agg/stage-aggregate agg-system params)))

(defn- unregister-aggregate! [agg-system
                              stream-model
                              psk
                              msg
                              original
                              params]
  (update-aggregate! agg-system stream-model psk msg original
    (agg/remove-aggregate agg-system params)))

(defn-authd configure-stream! [psk
                               stream-model
                               agg-system
                               msg
                               original]
  (let [params (-> msg (get "m") (get "params"))







|








|







82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
(defn- register-aggregate! [agg-system
                            stream-model
                            psk
                            msg
                            original
                            params]
  (update-aggregate! agg-system stream-model psk msg original
    (agg/stage-aggregator agg-system params)))

(defn- unregister-aggregate! [agg-system
                              stream-model
                              psk
                              msg
                              original
                              params]
  (update-aggregate! agg-system stream-model psk msg original
    (agg/remove-aggregator agg-system params)))

(defn-authd configure-stream! [psk
                               stream-model
                               agg-system
                               msg
                               original]
  (let [params (-> msg (get "m") (get "params"))

Changes to src/streamful/stream.clj.

30
31
32
33
34
35
36

37
    May cause state to change in the backend as we keep track of
    who has subscribed to our stream.")
  (get-messages-for-aggregation [_ sid start-at])
  (get-message-by-sha [_ sha])
  (put-message! [_, ^bytes pk, msg, ^bytes original-msg-bytes]
    "Put a message into a stream. Must be part of a signed request.")
  (put-aggregate! [_ iid agg-value])

  (move-aggregate! [_ t k old-status new-status]))







>
|
30
31
32
33
34
35
36
37
38
    May cause state to change in the backend as we keep track of
    who has subscribed to our stream.")
  (get-messages-for-aggregation [_ sid start-at])
  (get-message-by-sha [_ sha])
  (put-message! [_, ^bytes pk, msg, ^bytes original-msg-bytes]
    "Put a message into a stream. Must be part of a signed request.")
  (put-aggregate! [_ iid agg-value])
  (get-aggregators [_])
  (move-aggregator! [_ t k old-status new-status]))

Changes to src/streamful/stream_datalevin.clj.

251
252
253
254
255
256
257
258
259

260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
        lk (streamk iid (System/currentTimeMillis))
        r [:open-closed (or start-at (streamk iid 0)) lk]]
    ;;todo increase batch size
    (db/get-first-n db stream-data-table 5 r :string :data false)))

(defn get-message-by-sha [db sha]
  (let [msg-id (db/get-value db stream-mapping-table sha :string :string)]
    (when msg-id
      (db/get-value db stream-data-table msg-id :string :data false))))


(defn put-aggregate! [db iid agg-value]
  (db/with-transaction-kv [tx db]
    (let [[iid msg] (db/get-value db stream-data-table iid :string :data false)]
      (if msg
        (let [replacement
              [:put
               stream-data-table
               iid
               (assoc msg "agg" agg-value)
               :string
               :data]]
          (db/transact-kv tx [replacement]))
        (log/error "FAILED to replace aggregate")))))

(defn move-aggregate! [db t k old-status new-status]
  (db/with-transaction-kv [tx db]
    (let [existing-aggregates (or (get-db-aggregates tx) {})

          aggregate (get-in existing-aggregates [old-status t k])

          new-aggregates
          (update-in existing-aggregates [old-status t] dissoc k)







|
|
>















|







251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
        lk (streamk iid (System/currentTimeMillis))
        r [:open-closed (or start-at (streamk iid 0)) lk]]
    ;;todo increase batch size
    (db/get-first-n db stream-data-table 5 r :string :data false)))

(defn get-message-by-sha [db sha]
  (let [msg-id (db/get-value db stream-mapping-table sha :string :string)]
    (if msg-id
      (db/get-value db stream-data-table msg-id :string :data false)
      (log/errorf "Message missing: '%s'" sha))))

(defn put-aggregate! [db iid agg-value]
  (db/with-transaction-kv [tx db]
    (let [[iid msg] (db/get-value db stream-data-table iid :string :data false)]
      (if msg
        (let [replacement
              [:put
               stream-data-table
               iid
               (assoc msg "agg" agg-value)
               :string
               :data]]
          (db/transact-kv tx [replacement]))
        (log/error "FAILED to replace aggregate")))))

(defn move-aggregator! [db t k old-status new-status]
  (db/with-transaction-kv [tx db]
    (let [existing-aggregates (or (get-db-aggregates tx) {})

          aggregate (get-in existing-aggregates [old-status t k])

          new-aggregates
          (update-in existing-aggregates [old-status t] dissoc k)
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315

316
317
      (has-owner? [_] (get-db-owner db))
      (claim! [_ pk] (claim-server! db pk))
      (register-stream! [_, pk, msg, original-msg-bytes]
        (register-stream! db pk msg original-msg-bytes))
      (configure-stream! [_, pk, msg, original-msg-bytes]
        (configure-stream! db pk msg original-msg-bytes))
      (list-streams [_, pk, msg] (list-streams db pk msg))
      (list-streams-for-aggregation [_]
        (-> db get-db-manifest keys))
      (get-messages! [_, pk, msg, original-msg-bytes]
        (get-messages! db pk msg original-msg-bytes))
      (get-messages-for-aggregation [_ sid start-at]
        (get-messages-for-aggregation db sid start-at))
      (get-message-by-sha [_ sha] (get-message-by-sha db sha))
      (put-message! [_, pk, msg, original-msg-bytes]
        (put-message! db pk msg original-msg-bytes))
      (put-aggregate! [_ iid agg-value]
        (put-aggregate! db iid agg-value))

      (move-aggregate! [_ t k old-status new-status]
        (move-aggregate! db t k old-status new-status)))))







|
<









>
|
|
299
300
301
302
303
304
305
306

307
308
309
310
311
312
313
314
315
316
317
318
      (has-owner? [_] (get-db-owner db))
      (claim! [_ pk] (claim-server! db pk))
      (register-stream! [_, pk, msg, original-msg-bytes]
        (register-stream! db pk msg original-msg-bytes))
      (configure-stream! [_, pk, msg, original-msg-bytes]
        (configure-stream! db pk msg original-msg-bytes))
      (list-streams [_, pk, msg] (list-streams db pk msg))
      (list-streams-for-aggregation [_] (-> db get-db-manifest keys))

      (get-messages! [_, pk, msg, original-msg-bytes]
        (get-messages! db pk msg original-msg-bytes))
      (get-messages-for-aggregation [_ sid start-at]
        (get-messages-for-aggregation db sid start-at))
      (get-message-by-sha [_ sha] (get-message-by-sha db sha))
      (put-message! [_, pk, msg, original-msg-bytes]
        (put-message! db pk msg original-msg-bytes))
      (put-aggregate! [_ iid agg-value]
        (put-aggregate! db iid agg-value))
      (get-aggregators [_] (get-db-aggregates db))
      (move-aggregator! [_ t k old-status new-status]
        (move-aggregator! db t k old-status new-status)))))

Changes to test/streamful/aggregates_test.clj.

20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
  (:require [clojure.test :refer :all]
            [streamful.aggregates :as agg]
            [streamful.client :as client]
            [streamful.crypto :as cr]
            [streamful.server.netty :as server]
            [streamful.server.netty]
            [streamful.test-aggregates :as tagg]
            [streamful.test-asserts :as ta]
            [streamful.test-cfg :as tcfg]
            [streamful.test-protocol :as tp]))

(tcfg/def-configd-server-test reload-aggregates-test
  [out
   server/start-netty-server!
   {}]







<







20
21
22
23
24
25
26

27
28
29
30
31
32
33
  (:require [clojure.test :refer :all]
            [streamful.aggregates :as agg]
            [streamful.client :as client]
            [streamful.crypto :as cr]
            [streamful.server.netty :as server]
            [streamful.server.netty]
            [streamful.test-aggregates :as tagg]

            [streamful.test-cfg :as tcfg]
            [streamful.test-protocol :as tp]))

(tcfg/def-configd-server-test reload-aggregates-test
  [out
   server/start-netty-server!
   {}]
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
          (tp/is-sent
            tp/ok-put-ptn
            client
            client-keys
            "root"
            {:t "reply1", :rid msg-id1})]

      (ta/pending "reloads aggregates that were previously installed"
        (restart-server!)
        (client/close client)

        (let [msg-id3
              (tp/is-sent
                tp/ok-put-ptn
                client
                client-keys
                "root"
                {:t "reply2", :rid msg-id2})

              expected
              [[{"ct" 5,
                 "c" "put",
                 "k" client-pk,
                 "m" {"params" {"t" "msg1"}, "sid" "root"}}
                {"rc" 1, "rm" [msg-id2]}]
               [{"ct" 6,
                 "c" "put",
                 "k" client-pk,
                 "m"
                 {"params"
                  {"t" "reply1",
                   "rid" msg-id1},
                  "sid" "root"}}
                {"rc" 1, "rm" [msg-id3]}]]
              actual (tp/get-msg-with-agg client "root")]
          #_(clojure.pprint/pprint
            {:e expected
             :a actual})
          (is (= expected actual)))))))

(tcfg/def-configd-server-test process-streams!-test
  [out
   server/start-netty-server!
   {}]
  (let [{:keys [new-client stream-model-fn]} out







|












|




|









<
<
<







69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103



104
105
106
107
108
109
110
          (tp/is-sent
            tp/ok-put-ptn
            client
            client-keys
            "root"
            {:t "reply1", :rid msg-id1})]

      (testing "reloads aggregates that were previously installed"
        (restart-server!)
        (client/close client)

        (let [msg-id3
              (tp/is-sent
                tp/ok-put-ptn
                client
                client-keys
                "root"
                {:t "reply2", :rid msg-id2})

              expected
              [[{"ct" 7,
                 "c" "put",
                 "k" client-pk,
                 "m" {"params" {"t" "msg1"}, "sid" "root"}}
                {"rc" 1, "rm" [msg-id2]}]
               [{"ct" 8,
                 "c" "put",
                 "k" client-pk,
                 "m"
                 {"params"
                  {"t" "reply1",
                   "rid" msg-id1},
                  "sid" "root"}}
                {"rc" 1, "rm" [msg-id3]}]]
              actual (tp/get-msg-with-agg client "root")]



          (is (= expected actual)))))))

(tcfg/def-configd-server-test process-streams!-test
  [out
   server/start-netty-server!
   {}]
  (let [{:keys [new-client stream-model-fn]} out

Changes to test/streamful/manual_testing.clj.

18
19
20
21
22
23
24

25
26
27
28
29
30
31

(ns streamful.manual-testing
  (:require [clojure.java.io :as io]
            [clojure.test :refer :all]
            [streamful.client :as client]
            [streamful.crypto :as cr]
            [streamful.io :as sio]

            [taoensso.timbre :as log])
  (:import (java.util.concurrent Executors)))

(def ^:private manual-testing-thread-pool (Executors/newCachedThreadPool))

;; this appears at server startup
;; eventually we will use a TXT record hosted using DNSSEC







>







18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

(ns streamful.manual-testing
  (:require [clojure.java.io :as io]
            [clojure.test :refer :all]
            [streamful.client :as client]
            [streamful.crypto :as cr]
            [streamful.io :as sio]
            [streamful.test-aggregates :as tagg]
            [taoensso.timbre :as log])
  (:import (java.util.concurrent Executors)))

(def ^:private manual-testing-thread-pool (Executors/newCachedThreadPool))

;; this appears at server startup
;; eventually we will use a TXT record hosted using DNSSEC
78
79
80
81
82
83
84






























85
86
87
88
89
90
91
    (as client k)
    (let [f (fn [sid] (some-> (client/get-messages client {:sid sid})
                              :response
                              (get "messages")
                              count))]
      (map f ["root" "public" "personal"]))))































(comment
  (log/set-min-level! :error)

  (def c (client/new-client "localhost" 1100 server-pk 1000))
  (def c (client/new-client "localhost" 1100 "BAD" 1000))
  (def c2 (client/new-client "localhost" 1100 server-pk 1000))








>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
    (as client k)
    (let [f (fn [sid] (some-> (client/get-messages client {:sid sid})
                              :response
                              (get "messages")
                              count))]
      (map f ["root" "public" "personal"]))))

(defn- add-aggregates! [c]
  (clojure.pprint/pprint
    (client/add-aggregate
      c
      {:t "msg"
       :n "replies"
       :s tagg/reply-aggregate-selector-fn
       :a tagg/reply-aggregator-fn
       :c tagg/cleanup-reply-fn}))

  (clojure.pprint/pprint
    (client/add-aggregate
      c
      {:t "msg"
       :n "threads"
       :s tagg/thread-aggregate-selector-fn
       :a tagg/thread-aggregator-fn
       :c tagg/cleanup-thread-fn})))

(defn- show-thread [c sid msg-id]
  (as-> (client/get-messages c {:sid sid}) x
        (:response x)
        (x "messages")
        (map
          (fn [{:strs [id agg] {:strs [c k] {:strs [params]} "m"} "m" :as _raw}]
            {:id id, :cmd c, :params params, :agg agg, :k k
             ;;:raw raw
             }) x)
        (filter (fn [{:keys [id]}] (= msg-id id)) x)))

(comment
  (log/set-min-level! :error)

  (def c (client/new-client "localhost" 1100 server-pk 1000))
  (def c (client/new-client "localhost" 1100 "BAD" 1000))
  (def c2 (client/new-client "localhost" 1100 server-pk 1000))

105
106
107
108
109
110
111
112
113
114
115
116
117
118


119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135


















136
137
138
139
140
141
142
  (as c (assoc k :private-key (:private-key (new-keys))))

  (as c k)
  (as c nil)
  (as c2 k)
  (as c2 nil)

  (client/claim-server c "v0CA_kT8RQ_IrbEgF9SltA")
  (client/register-stream c {:sid "root"})
  (client/configure-stream c {:sid "root" :substreams-allowed? true})
  (client/configure-stream c {:sid "root" :anon-allowed? true})
  (client/register-stream c {:sid "personal"})
  (client/register-stream c {:sid "public"})
  (client/configure-stream c {:sid "public" :anon-allowed? true})



  (client/list-streams c nil)

  (time (client/get-messages c {:sid "personal"}))
  ;; -> 5-10ms

  (client/get-messages c {:sid "root"})
  (client/get-messages c2 {:sid "root"})
  (client/get-messages c {:sid "personal"})
  (client/get-messages c2 {:sid "personal"})
  (client/get-messages c {:sid "public"})
  (client/get-messages c2 {:sid "public"})

  (client/submit-message c {:sid "root", :t "hello, world!"})
  (client/submit-message c {:sid "personal", :t "my secrets are mine"})
  (client/submit-message c {:sid "public", :t "hello, friends!"})



















  (do (as c (new-keys)) (client/get-messages c {:sid "root"}))
  ;=> {:response {"status" "stream not found"}}

  (client/is-open? c)

  (time (doseq [x (range 25)] (println (client/ping c (str "go: " x)))))








|






>
>

















>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
  (as c (assoc k :private-key (:private-key (new-keys))))

  (as c k)
  (as c nil)
  (as c2 k)
  (as c2 nil)

  (client/claim-server c "pOqF4YMsRke94fsA7vaH3A")
  (client/register-stream c {:sid "root"})
  (client/configure-stream c {:sid "root" :substreams-allowed? true})
  (client/configure-stream c {:sid "root" :anon-allowed? true})
  (client/register-stream c {:sid "personal"})
  (client/register-stream c {:sid "public"})
  (client/configure-stream c {:sid "public" :anon-allowed? true})

  (add-aggregates! c)

  (client/list-streams c nil)

  (time (client/get-messages c {:sid "personal"}))
  ;; -> 5-10ms

  (client/get-messages c {:sid "root"})
  (client/get-messages c2 {:sid "root"})
  (client/get-messages c {:sid "personal"})
  (client/get-messages c2 {:sid "personal"})
  (client/get-messages c {:sid "public"})
  (client/get-messages c2 {:sid "public"})

  (client/submit-message c {:sid "root", :t "hello, world!"})
  (client/submit-message c {:sid "personal", :t "my secrets are mine"})
  (client/submit-message c {:sid "public", :t "hello, friends!"})

  (def msg-id1
    (as-> (client/get-messages c {:sid "root"}) x
          (:response x)
          (x "messages")
          (map
            (fn [{:strs [id] {:strs [c] {:strs [params]} "m"} "m"}]
              {:id id, :cmd c, :params params}) x)
          (filter (fn [{:keys [cmd]}] (= "put" cmd)) x)
          (first x)
          (:id x)))

  (show-thread c "root" msg-id1)

  (client/submit-message c {:sid "public"
                            :t "hello, to you!"
                            :rid msg-id1
                            :tid msg-id1})

  (do (as c (new-keys)) (client/get-messages c {:sid "root"}))
  ;=> {:response {"status" "stream not found"}}

  (client/is-open? c)

  (time (doseq [x (range 25)] (println (client/ping c (str "go: " x)))))