Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Proper encryption between client and server for authenticated messages |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA3-256: |
80fa8b626eb6f51ce305beb46cbfdbef |
User & Date: | scstarkey 2025-02-17 22:00:17 |
Context
2025-02-17
| ||
22:23 | Creating secure passphrase file on the client side, too check-in: e6f0cb846e user: scstarkey tags: trunk | |
22:00 | Proper encryption between client and server for authenticated messages check-in: 80fa8b626e user: scstarkey tags: trunk | |
16:02 | Bugfix -- anon-allowed no longer clobbers substreams-allowed check-in: f8d067ad91 user: scstarkey tags: trunk | |
Changes
Changes to src/streamful/client.clj.
︙ | ︙ | |||
14 15 16 17 18 19 20 | ; You should have received a copy of the GNU Affero General Public ; License along with streamful. ; If not, see <https://www.gnu.org/licenses/#AGPL>. ; (ns streamful.client (:require [clj-cbor.core :as cbor] | < < < < < > < < < < < < < < < < < < < < < < < < < | 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | ; You should have received a copy of the GNU Affero General Public ; License along with streamful. ; If not, see <https://www.gnu.org/licenses/#AGPL>. ; (ns streamful.client (:require [clj-cbor.core :as cbor] [streamful.crypto :as cr] [streamful.io :as sio] [streamful.time :as time] [streamful.transport :as transport] [taoensso.timbre :as log]) (:import (java.io OutputStream) (java.net Socket))) (defn send-string-and-receive! [^OutputStream out read-line! ^String message] (log/debug "send-string-and-receive!" message) (sio/send-string-streaming! out message) (let [result (read-line!)] (log/debug message "received" result) |
︙ | ︙ | |||
68 69 70 71 72 73 74 | read-line! read-bytes! ^bytes bytes-to-send] (let [bytes-to-read (send-bytes-and-receive! out read-line! bytes-to-send) bytes-to-read (Integer/parseInt bytes-to-read)] (read-bytes! bytes-to-read))) | > > > > | > > | | > > > > > > > | > > | | > | > > > | 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 | read-line! read-bytes! ^bytes bytes-to-send] (let [bytes-to-read (send-bytes-and-receive! out read-line! bytes-to-send) bytes-to-read (Integer/parseInt bytes-to-read)] (read-bytes! bytes-to-read))) (defn sign-and-encrypt [private-signing-key client-tx msg] (let [signed (cr/sign msg private-signing-key) encrypted (cr/encrypt signed client-tx)] (transport/encode-msg encrypted))) (defn decrypt [client-rx {:strs [msg nonce] :as response}] (if nonce (let [decrypted (cr/decrypt nonce msg client-rx)] (cbor/decode decrypted)) response)) (defn send-cmd! [{:keys [private-signing-key client-tx client-rx]} out-stream read-line! read-bytes! msg] (let [msg (transport/encode-msg msg) msg (if private-signing-key (sign-and-encrypt private-signing-key client-tx msg) msg) byte-count (count msg) cmd-resp (send-string-and-receive! out-stream read-line! (format (if private-signing-key "signed %s" "send %s") byte-count)) result (if (= "GO" cmd-resp) (send-bytes-and-receive-bytes! out-stream read-line! read-bytes! msg) (throw (ex-info "Invalid command response" {:response cmd-resp}))) result (cbor/decode result)] (if private-signing-key (decrypt client-rx result) result))) (defprotocol Client (ping [_ msg] "Ping the server. Msg is what the server should echo back") (as-stream [_ client-keys] "Establishes a secure communications session between client and server. The client-keys must be a map containing :public-key and :private-key |
︙ | ︙ | |||
144 145 146 147 148 149 150 | (dosync (ref-set sock new-socket) (ref-set in-stream new-in) (ref-set out-stream new-out)) (@listener))) send-cmd! | | | > | | | | | | | > | | > > > > > > > > > > > | < < | | | < < < < < < | | | | | | | | | < | < | > > > > > | > > > | > > > | > | 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 | (dosync (ref-set sock new-socket) (ref-set in-stream new-in) (ref-set out-stream new-out)) (@listener))) send-cmd! (fn [{:keys [public-signing-key] :as session} msg] (try {:response (send-cmd! session @out-stream read-line! read-bytes! (merge msg {:ct (time/now)} (when public-signing-key {:k (cr/b64-str public-signing-key)})))} (catch Throwable t (log/error "Failed to send command" t) (close!) {:response :err :reason (.getMessage t)})))] {:is-open? is-open? :read-line! read-line! :send-cmd! (fn [client-keys msg] (when-not (is-open?) (open!)) (send-cmd! client-keys msg)) :send-keys! (fn [pk psk] (try (when-not (is-open?) (open!)) (let [result (send-string-and-receive! @out-stream read-line! (str "keys " (cr/b64-str pk) " " (cr/b64-str psk)))] {:response result}) (catch Throwable t (log/error "Failed to send key" t) (close!) {:response :err :reason (.getMessage t)}))) :close! close! :reg-listener! (fn [new-listener] (reset! listener new-listener))})) (defn- process-key-response [expected-server-pk {:keys [response]}] (let [[_ server-pk-str] (re-matches #"ok ([-A-Za-z0-9+/]+={0,3})" response)] (when server-pk-str (when-not (= expected-server-pk server-pk-str) (throw (ex-info "Server key not trusted!" {:expected expected-server-pk :actual server-pk-str})))) server-pk-str)) (defn new-client "Creates a client object that opens a server connection whenever it needs to and keeps it open as long as it can. If any error occurs, the connection is closed automatically." [^String host, ^Integer port, ^String server-pk, ^Integer timeout] (let [session (atom {}) {:keys [send-cmd! send-keys! close! is-open? reg-listener!]} (configure-connection host port timeout) send-signed! (fn [msg] (send-cmd! @session msg))] (reg-listener! (fn [] (let [{:keys [public-signing-key public-key]} @session] (when (and public-signing-key public-key) (let [result (send-keys! public-key public-signing-key)] (when-not (process-key-response server-pk result) (throw (ex-info "Failed to send public key" result)))))))) (reify Client (ping [_ msg] (send-cmd! nil {:c "ping", :m msg})) (get-messages [_ {:keys [sid]}] (send-signed! {:c "get" :m {:sid sid}})) (as-stream [_ {:keys [public-key private-key public-signing-key] :as ks}] (cond ks (let [response (send-keys! public-key public-signing-key) server-pk-str (process-key-response server-pk response)] (if server-pk-str (let [session-keypair (cr/client-session-keypair public-key private-key (cr/b64-str->bytes server-pk-str))] (reset! session (-> ks (merge session-keypair) (assoc :server-pk-str server-pk-str))) {:result "ok"}) {:result "failed", :response response})) (empty? @session) {:result "already anonymous"} :else (do (reset! session {}) |
︙ | ︙ |
Changes to src/streamful/core.clj.
︙ | ︙ | |||
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | ; If not, see <https://www.gnu.org/licenses/#AGPL>. ; (ns streamful.core (:gen-class) (:require [clojure.java.io :as io] [defenv.core :as env] [streamful.env :as senv] [streamful.protocol :as proto] [streamful.simple-test-handler :as sth] [streamful.server.netty :as netty] [streamful.server.socket :as socket] [streamful.slf4j :as slf4j] [streamful.stream :as stream] [streamful.stream-datalevin :as sdl] [streamful.temp :as temp] [taoensso.timbre :as log]) | > | > > > | 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | ; If not, see <https://www.gnu.org/licenses/#AGPL>. ; (ns streamful.core (:gen-class) (:require [clojure.java.io :as io] [defenv.core :as env] [streamful.crypto :as cr] [streamful.env :as senv] [streamful.protocol :as proto] [streamful.simple-test-handler :as sth] [streamful.server.netty :as netty] [streamful.server.socket :as socket] [streamful.slf4j :as slf4j] [streamful.stream :as stream] [streamful.stream-datalevin :as sdl] [streamful.temp :as temp] [taoensso.timbre :as log]) (:import (java.io File) (java.nio.file Files Paths) (java.nio.file.attribute FileAttribute PosixFilePermissions) (java.time Duration) (java.time.temporal ChronoUnit))) (def copyright-resource (io/resource "copyright.txt")) (def copyright-text (slurp copyright-resource)) (defn copyright [] |
︙ | ︙ | |||
98 99 100 101 102 103 104 | (if (= "COPYRIGHT" sid) {:status :ok :messages [{"m" copyright-text}]} (stream/get-messages! db-stream-model pk msg original-msg-bytes))) (put-message! [_, pk, msg, original-msg-bytes] (stream/put-message! db-stream-model pk msg original-msg-bytes)))) | | > | > > > > > > > > > > > > > > | > > > > > | 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | (if (= "COPYRIGHT" sid) {:status :ok :messages [{"m" copyright-text}]} (stream/get-messages! db-stream-model pk msg original-msg-bytes))) (put-message! [_, pk, msg, original-msg-bytes] (stream/put-message! db-stream-model pk msg original-msg-bytes)))) (defn build-protocol-handler [server-keys db-loc session-timeout] (let [db-stream-model (sdl/build-stream-model db-loc) stream-model (wrap-stream-model db-stream-model)] (proto/protocol-handler {:server-keys server-keys :stream-model stream-model :session-timeout (-> session-timeout (Duration/of ChronoUnit/MINUTES) (.toMillis))}))) (defn prepare-server-keys! [^String key-file-name] (let [^File key-file (io/file key-file-name)] (when-not (.exists key-file) (let [permissions (PosixFilePermissions/fromString "rw-------") permissions (PosixFilePermissions/asFileAttribute permissions)] (log/infof "Generating server key file '%s'" key-file) (Files/createFile (.toPath key-file) (into-array FileAttribute [permissions])) (spit key-file (cr/passphrase)))) (-> key-file slurp cr/new-keys))) (defn -main "Main entrypoint for the Streamful application" [& _] (init "* Load the COPYRIGHT stream for more details *") (env/display-env) (temp/set-temp-dir-root! @senv/temp-dir-root) (temp/cleanup-all-tempdirs!) (log/set-min-level! @senv/log-level) (let [server-running (atom true) stop-server-atom (atom nil) start-fn! (if (= :socket @senv/server-mode) start-socket! start-netty!) server-keys (prepare-server-keys! @senv/server-key-file) stop-server (start-fn! (if (= :streamful @senv/protocol) (build-protocol-handler server-keys @senv/db-loc @senv/session-timeout) (sth/build-handler stop-server-atom @senv/timeout)) (fn [] (log/infof "You should be able to connect to port %s" @senv/port)) (fn [] (log/info "Server is stopping") (reset! server-running false)))] (log/infof "Server public key: '%s'" (cr/b64-str (:public-key server-keys))) (future (cleanup-loop server-running)) (reset! stop-server-atom stop-server) (while @server-running (Thread/sleep 1000)) (temp/cleanup-all-tempdirs!) (shutdown-agents) (log/info "Thanks for playing!"))) |
Changes to src/streamful/crypto.clj.
︙ | ︙ | |||
200 201 202 203 204 205 206 | (ckx/server-session-keys server-public-key (server-private-key) client-public-key)] {:server-tx (constantly server-tx) :server-rx (constantly server-rx)})) (defn client-session-keypair [client-public-key | | | | 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 | (ckx/server-session-keys server-public-key (server-private-key) client-public-key)] {:server-tx (constantly server-tx) :server-rx (constantly server-rx)})) (defn client-session-keypair [client-public-key client-private-key server-public-key] (let [{:keys [client-tx client-rx]} (ckx/client-session-keys client-public-key (client-private-key) server-public-key)] {:client-tx (constantly client-tx) :client-rx (constantly client-rx)})) ;; # Helpful utilities (defn guid |
︙ | ︙ |
Changes to src/streamful/env.clj.
︙ | ︙ | |||
13 14 15 16 17 18 19 | ; ; You should have received a copy of the GNU Affero General Public ; License along with streamful. ; If not, see <https://www.gnu.org/licenses/#AGPL>. ; (ns streamful.env | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | > > > > > > > | 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | ; ; You should have received a copy of the GNU Affero General Public ; License along with streamful. ; If not, see <https://www.gnu.org/licenses/#AGPL>. ; (ns streamful.env (:require [clojure.java.io :as io] [clojure.string :as str] [defenv.core :refer [defenv]]) (:import (java.time Duration) (java.time.temporal ChronoUnit))) (def default-server-threads (* (.availableProcessors (Runtime/getRuntime)) 10)) (defenv port :doc "Port to listen on" :default "1100" :masked? false :tfn parse-long) (defenv log-level :doc "Level to log messages at" :default "info" :masked? false :tfn (comp keyword str/lower-case)) (defenv timeout :doc "How long until we give up waiting for bytes (ms)" :default "60000" :masked? false :tfn parse-long) (defenv accept-timeout :doc "How long until we give up waiting for a socket connection (ms)" :default "5000" :masked? false :tfn parse-long) (defenv session-timeout :doc "How long until a connection loses its session data? (minutes)" :default "60" :masked? false :tfn parse-long) (defenv max-threads :doc "Maximum number of socket server threads" :default (str default-server-threads) :masked? false :tfn parse-long) (defenv temp-dir-root :doc "Root of temporary file storage" :default "/tmp/streamful" :masked? false) (defenv server-mode :doc "Can either be 'socket' or 'netty'" :default "netty" :masked? false :tfn keyword) (defenv temp-cleanup-period :doc "How old are temp files allowed to get? (seconds)" :default (-> (Duration/of 5 ChronoUnit/MINUTES) .toSeconds str) :masked? false :tfn parse-long) (defenv file-buffer-bytes :doc "At what size do we cache bytes to a file? (netty-only, bytes)" :default (str 1048576) :masked? false :tfn parse-long) (defenv file-channel-chunk-size :doc "Bytes to read into memory at a time with file buffer (netty-only)" :default (str 1048576) :masked? false :tfn parse-long) (defenv protocol :doc "Server protocol. Allowed modes: streamful, echo" :default "streamful" :masked? false :tfn keyword) (defenv db-loc :doc "Location of database" :default (->> ".streamful" (io/file (System/getenv "HOME")) .getAbsolutePath) :masked? false) (defenv server-key-file :doc "The file that contains the key that the server uses for comms" :default (->> ".streamful-server-key" (io/file (System/getenv "HOME")) .getAbsolutePath) :masked? false) |
Changes to src/streamful/protocol.clj.
︙ | ︙ | |||
17 18 19 20 21 22 23 24 25 26 27 28 29 | ; (ns streamful.protocol (:require [clj-cbor.core :as cbor] [clojure.core.cache.wrapped :as cache] [streamful.crypto :as cr] [streamful.stream :as stream] [taoensso.timbre :as log]) (:import (clojure.lang ExceptionInfo) (java.io ByteArrayInputStream))) (def ok-response "ok") (def unauthorized-response "unauthorized") | > | | | | | | < < < < < < < < < | > > > > > > > > | > | > > | > > > > > > > > > > > > > > > > > > > > > > | | < > | | | | | < > | | | | | | | | | | | < < < < < < < < < | < < > > > > > > > > > > > > > | > > > > | | < < < < | | | > > > > | | | > > | > | < > | > | > > | > > > | > > | | > > > | | | | | > | | | > > > > > > > > | 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 | ; (ns streamful.protocol (:require [clj-cbor.core :as cbor] [clojure.core.cache.wrapped :as cache] [streamful.crypto :as cr] [streamful.stream :as stream] [streamful.transport :as transport] [taoensso.timbre :as log]) (:import (clojure.lang ExceptionInfo) (java.io ByteArrayInputStream))) (def ok-response "ok") (def unauthorized-response "unauthorized") (def signature-required-response "authentication required") (def key-missing-response "key missing") (def already-exists-response "already exists") (def root-missing-response "root not yet configured -- register that first") (def stream-missing-response "stream not found") (def substreams-not-allowed-response "substreams aren't allowed here") (def failed-response "failed") (defmacro defn-authd [n a & b] `(defn ~n ~a (if ~(first a) (do ~@b) (do (log/debug (str "Attempted to " ~n " without a key")) ~signature-required-response)))) (defn-authd register-stream! [psk stream-model msg original] (let [result (stream/register-stream! stream-model psk msg original)] (case result :ok ok-response :root-stream-missing root-missing-response :already-exists already-exists-response :substreams-not-allowed substreams-not-allowed-response (do (log/warn "Failed to register stream" result) failed-response)))) (defn-authd configure-stream! [psk stream-model msg original] (let [result (stream/configure-stream! stream-model psk msg original)] (case result :ok ok-response :unauthorized unauthorized-response (do (log/warn "Failed to configure stream" result) failed-response)))) (defn-authd put-message! [psk stream-model cmd original] (let [result (stream/put-message! stream-model psk cmd original)] (case result :ok ok-response :unauthorized unauthorized-response (do (log/warn "Failed to configure stream" result) failed-response)))) (defn get-messages [psk stream-model cmd original] (let [{:keys [status messages] :as result} (stream/get-messages! stream-model psk cmd original)] (case status :ok {"status" ok-response, "messages" messages} :stream-missing {"status" stream-missing-response} (do (log/warn "Failed to get messages" result) failed-response)))) (defn- verified [{:keys [signed? psk server-rx]} original] (if signed? (when psk (let [{:strs [msg nonce]} (cbor/decode original) msg (cr/decrypt nonce msg server-rx)] {:original msg :msg (cr/verified msg psk)})) {:original original, :msg original})) (defn- encrypt-signed [{:keys [signed? psk server-tx]} response] (if (and signed? psk) (let [msg (cr/encrypt response server-tx)] (transport/encode-msg msg)) response)) (defn- router [stream-model] (fn [{:keys [psk original] {:keys [remote-ip session-id]} :req} {:strs [c m] :as cmd}] (case c "ping" m "get" (get-messages psk stream-model cmd original) "put" (put-message! psk stream-model cmd original) "reg" (register-stream! psk stream-model cmd original) "cfg" (configure-stream! psk stream-model cmd original) (throw (ex-info "Unknown command" {:cmd cmd :session-id session-id :remote-ip remote-ip}))))) (defn handle-message [router {:keys [signed? psk] :as req-state} {:keys [msg] :as req}] (->> (try (let [original (.readAllBytes msg) {:keys [original msg]} (verified req-state original)] (if msg (some->> (cbor/decode msg) (router (assoc req :psk (when signed? psk) :original original))) key-missing-response)) (catch ExceptionInfo e (if (= {:reason :verification-failed} (ex-data e)) (do (log/debug e {:psk (cr/b64-str psk)}) unauthorized-response) (throw e)))) cbor/encode (encrypt-signed req-state))) (defn- update-session-state! [session-state session-id req-state f] (let [new-state (f req-state)] (cache/evict session-state session-id) (cache/lookup-or-miss session-state session-id (constantly new-state)))) (defn with-session-keys [server-public-key server-private-key client-public-signing-key-str client-public-key-str m] (-> m (assoc :psk (cr/b64-str->bytes client-public-signing-key-str)) (merge (cr/server-session-keypair server-public-key server-private-key (cr/b64-str->bytes client-public-key-str))))) (defn- handle-cmd [update-state! {:keys [encoded-pk private-key public-key]} msg] (let [[_ bytes-to-receive] (re-matches #"send (\d+)" msg) [_ signed-bytes-to-receive] (re-matches #"signed (\d+)" msg) [_ pk-str psk-str] (re-matches #"keys ([-A-Za-z0-9+/]+={0,3}) ([-A-Za-z0-9+/]+={0,3})" msg) bytes-to-receive (or bytes-to-receive signed-bytes-to-receive) bytes-to-receive (when bytes-to-receive (Integer/parseInt bytes-to-receive))] (log/trace "CMD" bytes-to-receive pk-str psk-str) (if signed-bytes-to-receive (update-state! (fn [m] (assoc m :signed? true))) (update-state! (fn [m] (dissoc m :signed?)))) (cond bytes-to-receive [:bytes [bytes-to-receive "GO"]] (and pk-str psk-str) (do (update-state! (partial with-session-keys public-key private-key psk-str pk-str)) [:cmd (str ok-response " " encoded-pk)])))) (defn- handle-bytes [router req-state req] [:cmd (let [response (handle-message router req-state req)] [(count response) (ByteArrayInputStream. response)])]) (defn- send-error [mode reason] (case mode :bytes [:cmd (let [encoded (transport/encode-msg reason)] [(count encoded) (ByteArrayInputStream. encoded)])] :cmd [:cmd reason])) (defn- handler [server-keys router session-state {:keys [mode msg session-id] :as req}] (try (let [req-state (cache/lookup-or-miss session-state session-id (constantly {})) encoded-server-pk (cr/b64-str (:public-key server-keys)) key-info (assoc server-keys :encoded-pk encoded-server-pk)] (log/trace mode msg session-id req-state) (cond (= :cmd mode) (handle-cmd (partial update-session-state! session-state session-id req-state) key-info msg) (= :bytes mode) (handle-bytes router req-state req) :else (log/error "Invalid request" req))) (catch ExceptionInfo e (let [{:keys [reason] :as d} (ex-data e)] (when-not (= :decryption-failed reason) (log/error e)) (send-error mode (pr-str d)))) (catch Throwable t (log/error t) (send-error mode "Unexpected failure")))) (defn protocol-handler "The protocol is fairly simple. Connect to the server and keep the connection open. For every command you want to send, first CBOR encode a single object with at least one key. Then, send the string 'send {n}' where `n` is the length of the encoded object. The server will respond with 'GO'. If you got that, then send the encoded object directly. |
︙ | ︙ | |||
207 208 209 210 211 212 213 | Once you have executed `key`, then you can add one more parameter to your `sign` command: `send {n} signed` The assumption, then, is that the message you are about to send is cryptographically signed using the libsodium `crypto_sign` primitive. You can learn more at <https://libsodium.gitbook.io/doc> " | | > | > | | | 261 262 263 264 265 266 267 268 269 270 271 272 273 274 | Once you have executed `key`, then you can add one more parameter to your `sign` command: `send {n} signed` The assumption, then, is that the message you are about to send is cryptographically signed using the libsodium `crypto_sign` primitive. You can learn more at <https://libsodium.gitbook.io/doc> " [{:keys [^stream/StreamModel stream-model, session-timeout, server-keys] :or {session-timeout 1000}}] (let [router (router stream-model)] (partial handler server-keys router (cache/ttl-cache-factory {} :ttl session-timeout)))) |
Added src/streamful/transport.clj.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | ; ; This file is part of streamful. ; ; streamful is free software: you can redistribute it and/or modify it ; under the terms ofthe GNU Affero General Public License as published ; by the Free Software Foundation, either version 3 of the License, or ; (at your option) any later version. ; ; streamful is distributed in the hope that it will be useful, but ; WITHOUT ANY WARRANTY; without even the implied warranty of ; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ; GNU Affero General Public License for more details. ; ; You should have received a copy of the GNU Affero General Public ; License along with streamful. ; If not, see <https://www.gnu.org/licenses/#AGPL>. ; (ns streamful.transport (:require [clj-cbor.core :as cbor] [clj-cbor.tags.content :as tags.content] [clj-cbor.tags.numbers :as tags.num] [clj-cbor.tags.text :as tags.text] [clj-cbor.tags.time :as tags.time] [clojure.walk :as walk])) (def ^:private write-handlers (merge tags.content/content-write-handlers tags.num/number-write-handlers tags.time/epoch-time-write-handlers tags.time/epoch-date-write-handlers tags.text/text-write-handlers)) (def ^:private read-handlers (merge tags.content/content-read-handlers tags.num/number-read-handlers tags.time/instant-read-handlers tags.time/local-date-read-handlers tags.text/text-read-handlers)) (def ^:private cbor-codec (cbor/cbor-codec :write-handlers write-handlers :read-handlers read-handlers)) (defn encode-msg [msg] (->> msg walk/stringify-keys (cbor/encode cbor-codec))) |
Changes to test/streamful/manual_testing.clj.
︙ | ︙ | |||
56 57 58 59 60 61 62 63 | (defn- as [c k] (if k (client/as-stream c k) (client/as-stream c nil))) (comment (log/set-min-level! :error) | > > > > | > | > > > > > > > > | 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | (defn- as [c k] (if k (client/as-stream c k) (client/as-stream c nil))) (comment (log/set-min-level! :error) ;; this appears at server startup ;; eventually we will use a TXT record hosted using DNSSEC ;; to prove servers can be trusted (no SSL authorities needed!) (def server-pk "9G4BzxYJSwGxC6a3FC3wqqa6Chk/cAV9lmgmyvyhgUA=") (def c (client/new-client "localhost" 1100 server-pk 1000)) (def c (client/new-client "localhost" 1100 "BAD" 1000)) (def c2 (client/new-client "localhost" 1100 server-pk 1000)) (def k (load-keys)) (save-keys! (new-keys)) (keys k) (-> k :public-signing-key cr/b64-str) (as c k) ;; bad client signing key is handled ok by the server ;; expect "unauthorized" messages when using them (as c (assoc k :private-signing-key (:private-signing-key (new-keys)))) ;; bad client encryption key handled kind of OK by the server (as c (assoc k :private-key (:private-key (new-keys)))) (as c nil) (as c2 k) (as c2 nil) (client/register-stream c {:sid "root"}) (client/configure-stream c {:sid "root" :substreams-allowed? true}) (client/configure-stream c {:sid "root" :anon-allowed? true}) |
︙ | ︙ |
Changes to test/streamful/protocol_test.clj.
︙ | ︙ | |||
22 23 24 25 26 27 28 | [crypto.equality :as creq] [streamful.client :as client] [streamful.crypto :as cr] [streamful.protocol :refer :all] [streamful.server.netty :as server] [streamful.test-asserts :as ta] [streamful.test-cfg :as tcfg] | | > > | 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | [crypto.equality :as creq] [streamful.client :as client] [streamful.crypto :as cr] [streamful.protocol :refer :all] [streamful.server.netty :as server] [streamful.test-asserts :as ta] [streamful.test-cfg :as tcfg] [streamful.time :as time] [streamful.transport :as transport]) (:import (java.io ByteArrayInputStream))) ; This protocol is inspired by ; <https://github.com/nostr-protocol/nips/blob/master/01.md> ; except for a few minor differences. Since we are signing an object ; which has been CBOR encoded, we can simply sign the original message ; itself instead of forcing a specific sequence. The signature ; is wrapped using libsodium's envelope, instead of a custom detached |
︙ | ︙ | |||
49 50 51 52 53 54 55 | ; We also don't worry too much about whether the ID is 'different' between ; events that contain the same content. Our biggest worries are whether hash ; map keys are included in a different order, etc. Considering the client ; timestamp is unlikely to be identical, as in, the same exact command won't ; be generated by 2 different clients using 2 different implementations at ; exactly the same time, ... and really, who cares?? | > > > | < | < | | | | | < | | | | | | 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | ; We also don't worry too much about whether the ID is 'different' between ; events that contain the same content. Our biggest worries are whether hash ; map keys are included in a different order, etc. Considering the client ; timestamp is unlikely to be identical, as in, the same exact command won't ; be generated by 2 different clients using 2 different implementations at ; exactly the same time, ... and really, who cares?? (defn decode-signed-original [psk o] (-> o (cr/verified psk) cbor/decode)) (defn- reassemble-original [{:strs [psk o id]}] (let [expected-id (cr/hash-256-b64-str o) {:strs [k] :as value} (decode-signed-original psk o)] ;; message hash should match (is (= expected-id id)) ;; public key inside message should match the one we validated against (is (creq/eq? psk (cr/b64-str->bytes k))) value)) (defn- as [client keys] (is (= {:result "ok"} (client/as-stream client keys)))) (defn- anon [client] (is (= {:result "keys removed"} (client/as-stream client nil)))) (defn- get-messages-from [client id] (client/get-messages client {:sid id})) (defn- get-as [client keys id] (as client keys) (get-messages-from client id)) (defn- get-anon [client id] (anon client) (get-messages-from client id)) (defn- get-m [m] (get m "m")) (tcfg/def-server-test protocol-test [client] (testing "ping" (let [msg-to-echo (cr/guid94)] (is (= {:response msg-to-echo} (client/ping client msg-to-echo))))) (let [cur-now (atom 1) keys1 (cr/new-keys) keys2 (cr/new-keys)] (with-redefs [time/now (fn [] (swap! cur-now inc))] (testing "register stream" (testing "must sign with the correct key" (is (= {:result "ok"} (client/as-stream client (assoc keys1 :private-signing-key (:private-signing-key keys2))))) (is (= {:response "unauthorized"} (client/register-stream client {:sid "test1"})))) (testing "first stream must be called 'root'" (as client keys1) |
︙ | ︙ | |||
196 197 198 199 200 201 202 | (is (= expected-root-messages (map get-m root-messages)) r1) (is (= expected-test1-messages (map get-m test1-messages)) r2)) (testing "we can validate and reassemble from original binaries" (is (= expected-root-messages | | | | 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 | (is (= expected-root-messages (map get-m root-messages)) r1) (is (= expected-test1-messages (map get-m test1-messages)) r2)) (testing "we can validate and reassemble from original binaries" (is (= expected-root-messages (map reassemble-original root-messages))) (is (= expected-test1-messages (map reassemble-original test1-messages)))))) (testing "good error message when stream is missing" (is (= {:response {"status" "stream not found"}} (get-as client keys1 "missing-stream")))) (testing "streams are private by default, show as missing" (is (= {:response {"status" "stream not found"}} |
︙ | ︙ | |||
248 249 250 251 252 253 254 | expected-test1-messages {"c" "put" "m" {"sid" "test1", "params" {"t" "hello, friend!"}} "k" test1-k "ct" 21}) (map get-m test1-messages))))))))) | | | | 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 | expected-test1-messages {"c" "put" "m" {"sid" "test1", "params" {"t" "hello, friend!"}} "k" test1-k "ct" 21}) (map get-m test1-messages))))))))) (testing "authentication required" (anon client) (let [expected-response {:response "authentication required"}] (testing "registration" (is (= expected-response (client/register-stream client {:sid "test1"})))) (testing "config" (is (= expected-response (client/configure-stream client |
︙ | ︙ | |||
292 293 294 295 296 297 298 | (is (as-> (get-as client keys1 "test1") x (:response x) (x "status") (= "stream not found" x)))) (ta/pending "approval required") (ta/pending "invitation only")))) | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | | | 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 | (is (as-> (get-as client keys1 "test1") x (:response x) (x "status") (= "stream not found" x)))) (ta/pending "approval required") (ta/pending "invitation only")))) (defn- reassemble-from-handler [{:keys [psk original]}] (decode-signed-original psk original)) (deftest handle-message-test (let [messages-received (atom []) router (fn [req decoded] (swap! messages-received conj [decoded req]) "ok") server-keys (cr/new-keys) client-keys (cr/new-keys) client-public-key (:public-key client-keys) client-private-key (:private-key client-keys) server-public-key (:public-key server-keys) partial-signing-state {:signed? true} complete-crypto-state (with-session-keys server-public-key (:private-key server-keys) (cr/b64-str (:public-signing-key client-keys)) (cr/b64-str client-public-key) partial-signing-state) expected-insecure "feeling insecure" expected-secure "feeling quite secure"] (testing "message handling" (testing "unsigned messages are passed straight through" (let [msg {:msg expected-insecure} msg (transport/encode-msg msg) response (handle-message router {} {:msg (ByteArrayInputStream. msg)})] (is (= "ok" (cbor/decode response))))) (testing "server responds in plain text if no keys" (let [msg {:msg "no keys!"} msg (transport/encode-msg msg) response (handle-message router partial-signing-state {:msg (ByteArrayInputStream. msg)})] (is (= "key missing" (cbor/decode response))))) (testing "authd messages must be both signed and encrypted" (let [msg {:msg expected-secure} msg (transport/encode-msg msg) msg (cr/sign msg (:private-signing-key client-keys)) {:keys [client-tx client-rx]} (cr/client-session-keypair client-public-key client-private-key server-public-key) msg (cr/encrypt msg client-tx) msg (transport/encode-msg msg) encrypted-response (handle-message router complete-crypto-state {:msg (ByteArrayInputStream. msg)}) {:strs [nonce msg]} (cbor/decode encrypted-response) response (cr/decrypt nonce msg client-rx)] (is (= "ok" (cbor/decode response))))) (is (= [{"msg" expected-insecure}, {"msg" expected-secure}] (map first @messages-received))) (is (= {"msg" expected-secure} (-> @messages-received second second reassemble-from-handler)))))) (deftest server-key-test (ta/pending "client properly rejects bad server keys")) (deftest aggregates-test (ta/pending "server-level") (ta/pending "stream-level") (ta/pending "message-level")) (deftest server-side-subscriptions-test ;; Is there a background process that fetches periodically? ;; Or should we require clients to request a fetch every time? ;; This would allow fetches to be signed every time! And reduce ;; load on the server for subscriptions that are rare. We would ;; simply fetch all messages into a local stream and anybody ;; who subscribes on the same server would have access to them. ;; |
︙ | ︙ | |||
332 333 334 335 336 337 338 | ;; ;; Which might be a blessing. ;; ;; They could still get a picture by following everybody I follow. (ta/pending "server defaults") (ta/pending "per stream")) (ta/pending "purge remote stream on last unsubscribe")) | < < < < < < < < < < < < < < < < < < < < < < < | 419 420 421 422 423 424 425 | ;; ;; Which might be a blessing. ;; ;; They could still get a picture by following everybody I follow. (ta/pending "server defaults") (ta/pending "per stream")) (ta/pending "purge remote stream on last unsubscribe")) |
Changes to test/streamful/test_cfg.clj.
︙ | ︙ | |||
12 13 14 15 16 17 18 | ; GNU Affero General Public License for more details. ; ; You should have received a copy of the GNU Affero General Public ; License along with streamful. ; If not, see <https://www.gnu.org/licenses/#AGPL>. ; | < > | | | | > > | 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | ; GNU Affero General Public License for more details. ; ; You should have received a copy of the GNU Affero General Public ; License along with streamful. ; If not, see <https://www.gnu.org/licenses/#AGPL>. ; (ns streamful.test-cfg (:require [clojure.test :refer :all] [streamful.core :as core] [streamful.crypto :as cr] [streamful.log-util :as lu] [streamful.server.netty :as netty] [streamful.temp :as temp] [streamful.test-protocol :as tp]) (:import (java.io File))) (defmacro def-logcfg-test [n & body] `(deftest ~n (lu/test-log-level! ~@body))) (defn build-temp-protocol-handler [server-keys] (let [^File temp-dir (temp/session-tempdir!) ^File db-dir (File. temp-dir "db")] (core/build-protocol-handler server-keys (.getAbsolutePath db-dir) 1))) (defmacro def-configd-server-test [n b & body] `(def-logcfg-test ~n (let [server-keys# (cr/new-keys)] (tp/with-configd-server-and-client server-keys# (~build-temp-protocol-handler server-keys#) [~@b] ~@body) (temp/cleanup-all-tempdirs!)))) (defmacro def-server-test [n b & body] `(def-configd-server-test |
︙ | ︙ |
Changes to test/streamful/test_protocol.clj.
︙ | ︙ | |||
15 16 17 18 19 20 21 22 23 24 25 26 27 28 | ; License along with streamful. ; If not, see <https://www.gnu.org/licenses/#AGPL>. ; (ns streamful.test-protocol (:require [clojure.test :refer :all] [streamful.client :as client] [streamful.test-net :as tnet] [streamful.test-thread :as tthread])) (defn start-server-and-wait [start-fn! port handler {:keys [on-start on-exit timeout accept-timeout] :or {on-exit (constantly :default) on-start (constantly :default) | > | 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | ; License along with streamful. ; If not, see <https://www.gnu.org/licenses/#AGPL>. ; (ns streamful.test-protocol (:require [clojure.test :refer :all] [streamful.client :as client] [streamful.crypto :as cr] [streamful.test-net :as tnet] [streamful.test-thread :as tthread])) (defn start-server-and-wait [start-fn! port handler {:keys [on-start on-exit timeout accept-timeout] :or {on-exit (constantly :default) on-start (constantly :default) |
︙ | ︙ | |||
45 46 47 48 49 50 51 | :timeout timeout :accept-timeout accept-timeout :on-exit on-exit :on-start on-start))] (tthread/wait-for "server start" server-running) stop-server)) | | > | 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | :timeout timeout :accept-timeout accept-timeout :on-exit on-exit :on-start on-start))] (tthread/wait-for "server start" server-running) stop-server)) (defmacro with-configd-server-and-client [server-keys handler b & body] `(let [port# (tnet/next-port!) stop-server!# (start-server-and-wait ~(second b) port# ~handler ~(nth b 2))] (let [~(first b) (partial client/new-client "localhost" port# (-> ~server-keys :public-key cr/b64-str) 1000)] ~@body) (stop-server!#))) |