Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Aggregators reloaded! And validated manually |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA3-256: |
627799c4bae22bb99a31eb05f62a08f9 |
User & Date: | scstarkey 2025-03-13 13:14:17 |
Context
2025-03-13
| ||
13:29 | Fetch a bunch of messages by SHA (to see a whole thread, etc) check-in: 45ef5a3872 user: scstarkey tags: trunk | |
13:14 | Aggregators reloaded! And validated manually check-in: 627799c4ba user: scstarkey tags: trunk | |
2025-03-12
| ||
13:57 | Breadcrumbs check-in: 147f08bb76 user: scstarkey tags: trunk | |
Changes
Changes to src/streamful/aggregates.clj.
︙ | ︙ | |||
15 16 17 18 19 20 21 | ; License along with streamful. ; If not, see <https://www.gnu.org/licenses/#AGPL>. ; (ns streamful.aggregates (:require [clojure.walk :as walk] [sci.core :as sci] | | > | | | 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | ; License along with streamful. ; If not, see <https://www.gnu.org/licenses/#AGPL>. ; (ns streamful.aggregates (:require [clojure.walk :as walk] [sci.core :as sci] [streamful.stream :as stream] [taoensso.timbre :as log]) (:import (java.util.concurrent.locks ReentrantReadWriteLock))) (defprotocol AggregationSystem (stage-aggregator [_ params] "Prepare a new aggregate for introduction into the system. Compiles and places the function bundle into a pending state.") (remove-aggregator [_ params] "Prepares an aggregate for removal from the system. Immediately removes from the active list and moves it into the cleanup list. Actual cleanup happens on commit.") (rollback [_] "Removes all pending aggregates.") (commit [_] "For each pending aggregate, applies it to all stream messages and then |
︙ | ︙ | |||
46 47 48 49 50 51 52 | '[fn fn* -> ->> update conj and or inc dec assoc dissoc]) (def ^:private agg-opts {:allow agg-allowed-symbols :namespaces { ;;'clojure.core {'println println} }}) (defn- compile-str [s] (sci/eval-string s agg-opts)) | | | > > > | 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | '[fn fn* -> ->> update conj and or inc dec assoc dissoc]) (def ^:private agg-opts {:allow agg-allowed-symbols :namespaces { ;;'clojure.core {'println println} }}) (defn- compile-str [s] (sci/eval-string s agg-opts)) (defn- install-aggregator! [aggregators status t n s a c] (let [s (compile-str s) a (compile-str a) c (compile-str c)] (swap! aggregators assoc-in [status t n] {:s s, :a a, :c c}))) (defn- stage-aggregator! [aggregators t n s a c] (install-aggregator! aggregators :pending t n s a c)) (defmacro with-read! [& body] `(try (-> agg-lock .readLock .lock) ~@body (finally (-> agg-lock .readLock .unlock)))) |
︙ | ︙ | |||
131 132 133 134 135 136 137 | (partial apply-cleanup! stream-model prev-agg-fns))) (process-streams! stream-model (partial apply-aggregate! stream-model agg-fns)) (move-aggregate! aggregators "msg" k :pending :active) | | | > > > > > > > > > > > > > > > > < < | < | | | | 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | (partial apply-cleanup! stream-model prev-agg-fns))) (process-streams! stream-model (partial apply-aggregate! stream-model agg-fns)) (move-aggregate! aggregators "msg" k :pending :active) (stream/move-aggregator! stream-model "msg" k :pending :active)) (doseq [[k agg-fns] (get-in @aggregators [:cleanup "msg"])] (process-streams! stream-model (partial apply-cleanup! stream-model agg-fns)) (swap! aggregators update-in [:cleanup "msg"] dissoc k) (stream/move-aggregator! stream-model "msg" k :cleanup nil)))) (defn- execute! [aggregators stream-model msg] (with-read! (doseq [[_ agg-fns] (get-in @aggregators [:active "msg"])] (apply-aggregate! stream-model agg-fns msg)))) (defn load-aggregators! [stream-model aggregators] (log/info "Loading aggregators") (let [db-aggregators (stream/get-aggregators stream-model)] (doseq [status (keys db-aggregators) t (keys (status db-aggregators)) [n {:keys [a s c]} :as agg] (get-in db-aggregators [status t])] (if (= :active status) (do (log/infof " %s %s" t n) (install-aggregator! aggregators :active t n s a c)) (log/error "Unexpected status -- you may want to redo!" {:status status :t t :agg agg})))) (log/info "Aggregators loaded")) (defn base-aggregation [stream-model] (let [aggregators (atom {})] (load-aggregators! stream-model aggregators) (reify AggregationSystem (stage-aggregator [_ {:keys [t n s a c]}] (stage-aggregator! aggregators t n s a c)) (remove-aggregator [_ {:keys [t n]}] (move-aggregate! aggregators t n :active :cleanup)) (rollback [_] (with-write! (swap! aggregators dissoc :pending :cleanup))) (commit [_] (commit! stream-model aggregators)) (execute [_ msg] (execute! aggregators stream-model msg))))) |
Changes to src/streamful/core.clj.
︙ | ︙ | |||
113 114 115 116 117 118 119 | (stream/get-messages-for-aggregation db-stream-model sid start-at)) (get-message-by-sha [_ sha] (stream/get-message-by-sha db-stream-model sha)) (put-message! [_, pk, msg, original-msg-bytes] (stream/put-message! db-stream-model pk msg original-msg-bytes)) (put-aggregate! [_ iid agg-value] (stream/put-aggregate! db-stream-model iid agg-value)) | > > | | | 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | (stream/get-messages-for-aggregation db-stream-model sid start-at)) (get-message-by-sha [_ sha] (stream/get-message-by-sha db-stream-model sha)) (put-message! [_, pk, msg, original-msg-bytes] (stream/put-message! db-stream-model pk msg original-msg-bytes)) (put-aggregate! [_ iid agg-value] (stream/put-aggregate! db-stream-model iid agg-value)) (get-aggregators [_] (stream/get-aggregators db-stream-model)) (move-aggregator! [_ t k old-status new-status] (stream/move-aggregator! db-stream-model t k old-status new-status)))) (defn- notify-claim-key [notifier claim-key] (notifier (format (str "** This server is not claimed. Only the server owner can " "configure it. Claim the server and become its owner " "using this key: '%s' **") |
︙ | ︙ |
Changes to src/streamful/protocol.clj.
︙ | ︙ | |||
82 83 84 85 86 87 88 | (defn- register-aggregate! [agg-system stream-model psk msg original params] (update-aggregate! agg-system stream-model psk msg original | | | | 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | (defn- register-aggregate! [agg-system stream-model psk msg original params] (update-aggregate! agg-system stream-model psk msg original (agg/stage-aggregator agg-system params))) (defn- unregister-aggregate! [agg-system stream-model psk msg original params] (update-aggregate! agg-system stream-model psk msg original (agg/remove-aggregator agg-system params))) (defn-authd configure-stream! [psk stream-model agg-system msg original] (let [params (-> msg (get "m") (get "params")) |
︙ | ︙ |
Changes to src/streamful/stream.clj.
︙ | ︙ | |||
30 31 32 33 34 35 36 | May cause state to change in the backend as we keep track of who has subscribed to our stream.") (get-messages-for-aggregation [_ sid start-at]) (get-message-by-sha [_ sha]) (put-message! [_, ^bytes pk, msg, ^bytes original-msg-bytes] "Put a message into a stream. Must be part of a signed request.") (put-aggregate! [_ iid agg-value]) | > | | 30 31 32 33 34 35 36 37 38 | May cause state to change in the backend as we keep track of who has subscribed to our stream.") (get-messages-for-aggregation [_ sid start-at]) (get-message-by-sha [_ sha]) (put-message! [_, ^bytes pk, msg, ^bytes original-msg-bytes] "Put a message into a stream. Must be part of a signed request.") (put-aggregate! [_ iid agg-value]) (get-aggregators [_]) (move-aggregator! [_ t k old-status new-status])) |
Changes to src/streamful/stream_datalevin.clj.
︙ | ︙ | |||
251 252 253 254 255 256 257 | lk (streamk iid (System/currentTimeMillis)) r [:open-closed (or start-at (streamk iid 0)) lk]] ;;todo increase batch size (db/get-first-n db stream-data-table 5 r :string :data false))) (defn get-message-by-sha [db sha] (let [msg-id (db/get-value db stream-mapping-table sha :string :string)] | | | > | | 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 | lk (streamk iid (System/currentTimeMillis)) r [:open-closed (or start-at (streamk iid 0)) lk]] ;;todo increase batch size (db/get-first-n db stream-data-table 5 r :string :data false))) (defn get-message-by-sha [db sha] (let [msg-id (db/get-value db stream-mapping-table sha :string :string)] (if msg-id (db/get-value db stream-data-table msg-id :string :data false) (log/errorf "Message missing: '%s'" sha)))) (defn put-aggregate! [db iid agg-value] (db/with-transaction-kv [tx db] (let [[iid msg] (db/get-value db stream-data-table iid :string :data false)] (if msg (let [replacement [:put stream-data-table iid (assoc msg "agg" agg-value) :string :data]] (db/transact-kv tx [replacement])) (log/error "FAILED to replace aggregate"))))) (defn move-aggregator! [db t k old-status new-status] (db/with-transaction-kv [tx db] (let [existing-aggregates (or (get-db-aggregates tx) {}) aggregate (get-in existing-aggregates [old-status t k]) new-aggregates (update-in existing-aggregates [old-status t] dissoc k) |
︙ | ︙ | |||
298 299 300 301 302 303 304 | (has-owner? [_] (get-db-owner db)) (claim! [_ pk] (claim-server! db pk)) (register-stream! [_, pk, msg, original-msg-bytes] (register-stream! db pk msg original-msg-bytes)) (configure-stream! [_, pk, msg, original-msg-bytes] (configure-stream! db pk msg original-msg-bytes)) (list-streams [_, pk, msg] (list-streams db pk msg)) | | < > | | | 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 | (has-owner? [_] (get-db-owner db)) (claim! [_ pk] (claim-server! db pk)) (register-stream! [_, pk, msg, original-msg-bytes] (register-stream! db pk msg original-msg-bytes)) (configure-stream! [_, pk, msg, original-msg-bytes] (configure-stream! db pk msg original-msg-bytes)) (list-streams [_, pk, msg] (list-streams db pk msg)) (list-streams-for-aggregation [_] (-> db get-db-manifest keys)) (get-messages! [_, pk, msg, original-msg-bytes] (get-messages! db pk msg original-msg-bytes)) (get-messages-for-aggregation [_ sid start-at] (get-messages-for-aggregation db sid start-at)) (get-message-by-sha [_ sha] (get-message-by-sha db sha)) (put-message! [_, pk, msg, original-msg-bytes] (put-message! db pk msg original-msg-bytes)) (put-aggregate! [_ iid agg-value] (put-aggregate! db iid agg-value)) (get-aggregators [_] (get-db-aggregates db)) (move-aggregator! [_ t k old-status new-status] (move-aggregator! db t k old-status new-status))))) |
Changes to test/streamful/aggregates_test.clj.
︙ | ︙ | |||
20 21 22 23 24 25 26 | (:require [clojure.test :refer :all] [streamful.aggregates :as agg] [streamful.client :as client] [streamful.crypto :as cr] [streamful.server.netty :as server] [streamful.server.netty] [streamful.test-aggregates :as tagg] | < | 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | (:require [clojure.test :refer :all] [streamful.aggregates :as agg] [streamful.client :as client] [streamful.crypto :as cr] [streamful.server.netty :as server] [streamful.server.netty] [streamful.test-aggregates :as tagg] [streamful.test-cfg :as tcfg] [streamful.test-protocol :as tp])) (tcfg/def-configd-server-test reload-aggregates-test [out server/start-netty-server! {}] |
︙ | ︙ | |||
70 71 72 73 74 75 76 | (tp/is-sent tp/ok-put-ptn client client-keys "root" {:t "reply1", :rid msg-id1})] | | | | < < < | 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | (tp/is-sent tp/ok-put-ptn client client-keys "root" {:t "reply1", :rid msg-id1})] (testing "reloads aggregates that were previously installed" (restart-server!) (client/close client) (let [msg-id3 (tp/is-sent tp/ok-put-ptn client client-keys "root" {:t "reply2", :rid msg-id2}) expected [[{"ct" 7, "c" "put", "k" client-pk, "m" {"params" {"t" "msg1"}, "sid" "root"}} {"rc" 1, "rm" [msg-id2]}] [{"ct" 8, "c" "put", "k" client-pk, "m" {"params" {"t" "reply1", "rid" msg-id1}, "sid" "root"}} {"rc" 1, "rm" [msg-id3]}]] actual (tp/get-msg-with-agg client "root")] (is (= expected actual))))))) (tcfg/def-configd-server-test process-streams!-test [out server/start-netty-server! {}] (let [{:keys [new-client stream-model-fn]} out |
︙ | ︙ |
Changes to test/streamful/manual_testing.clj.
︙ | ︙ | |||
18 19 20 21 22 23 24 25 26 27 28 29 30 31 | (ns streamful.manual-testing (:require [clojure.java.io :as io] [clojure.test :refer :all] [streamful.client :as client] [streamful.crypto :as cr] [streamful.io :as sio] [taoensso.timbre :as log]) (:import (java.util.concurrent Executors))) (def ^:private manual-testing-thread-pool (Executors/newCachedThreadPool)) ;; this appears at server startup ;; eventually we will use a TXT record hosted using DNSSEC | > | 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | (ns streamful.manual-testing (:require [clojure.java.io :as io] [clojure.test :refer :all] [streamful.client :as client] [streamful.crypto :as cr] [streamful.io :as sio] [streamful.test-aggregates :as tagg] [taoensso.timbre :as log]) (:import (java.util.concurrent Executors))) (def ^:private manual-testing-thread-pool (Executors/newCachedThreadPool)) ;; this appears at server startup ;; eventually we will use a TXT record hosted using DNSSEC |
︙ | ︙ | |||
78 79 80 81 82 83 84 85 86 87 88 89 90 91 | (as client k) (let [f (fn [sid] (some-> (client/get-messages client {:sid sid}) :response (get "messages") count))] (map f ["root" "public" "personal"])))) (comment (log/set-min-level! :error) (def c (client/new-client "localhost" 1100 server-pk 1000)) (def c (client/new-client "localhost" 1100 "BAD" 1000)) (def c2 (client/new-client "localhost" 1100 server-pk 1000)) | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 | (as client k) (let [f (fn [sid] (some-> (client/get-messages client {:sid sid}) :response (get "messages") count))] (map f ["root" "public" "personal"])))) (defn- add-aggregates! [c] (clojure.pprint/pprint (client/add-aggregate c {:t "msg" :n "replies" :s tagg/reply-aggregate-selector-fn :a tagg/reply-aggregator-fn :c tagg/cleanup-reply-fn})) (clojure.pprint/pprint (client/add-aggregate c {:t "msg" :n "threads" :s tagg/thread-aggregate-selector-fn :a tagg/thread-aggregator-fn :c tagg/cleanup-thread-fn}))) (defn- show-thread [c sid msg-id] (as-> (client/get-messages c {:sid sid}) x (:response x) (x "messages") (map (fn [{:strs [id agg] {:strs [c k] {:strs [params]} "m"} "m" :as _raw}] {:id id, :cmd c, :params params, :agg agg, :k k ;;:raw raw }) x) (filter (fn [{:keys [id]}] (= msg-id id)) x))) (comment (log/set-min-level! :error) (def c (client/new-client "localhost" 1100 server-pk 1000)) (def c (client/new-client "localhost" 1100 "BAD" 1000)) (def c2 (client/new-client "localhost" 1100 server-pk 1000)) |
︙ | ︙ | |||
105 106 107 108 109 110 111 | (as c (assoc k :private-key (:private-key (new-keys)))) (as c k) (as c nil) (as c2 k) (as c2 nil) | | > > > > > > > > > > > > > > > > > > > > | 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 | (as c (assoc k :private-key (:private-key (new-keys)))) (as c k) (as c nil) (as c2 k) (as c2 nil) (client/claim-server c "pOqF4YMsRke94fsA7vaH3A") (client/register-stream c {:sid "root"}) (client/configure-stream c {:sid "root" :substreams-allowed? true}) (client/configure-stream c {:sid "root" :anon-allowed? true}) (client/register-stream c {:sid "personal"}) (client/register-stream c {:sid "public"}) (client/configure-stream c {:sid "public" :anon-allowed? true}) (add-aggregates! c) (client/list-streams c nil) (time (client/get-messages c {:sid "personal"})) ;; -> 5-10ms (client/get-messages c {:sid "root"}) (client/get-messages c2 {:sid "root"}) (client/get-messages c {:sid "personal"}) (client/get-messages c2 {:sid "personal"}) (client/get-messages c {:sid "public"}) (client/get-messages c2 {:sid "public"}) (client/submit-message c {:sid "root", :t "hello, world!"}) (client/submit-message c {:sid "personal", :t "my secrets are mine"}) (client/submit-message c {:sid "public", :t "hello, friends!"}) (def msg-id1 (as-> (client/get-messages c {:sid "root"}) x (:response x) (x "messages") (map (fn [{:strs [id] {:strs [c] {:strs [params]} "m"} "m"}] {:id id, :cmd c, :params params}) x) (filter (fn [{:keys [cmd]}] (= "put" cmd)) x) (first x) (:id x))) (show-thread c "root" msg-id1) (client/submit-message c {:sid "public" :t "hello, to you!" :rid msg-id1 :tid msg-id1}) (do (as c (new-keys)) (client/get-messages c {:sid "root"})) ;=> {:response {"status" "stream not found"}} (client/is-open? c) (time (doseq [x (range 25)] (println (client/ping c (str "go: " x))))) |
︙ | ︙ |