Streamful

Check-in [80fa8b626e]
Login

Check-in [80fa8b626e]

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Proper encryption between client and server for authenticated messages
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: 80fa8b626eb6f51ce305beb46cbfdbefb5eb474659034052a62438cf1b3fe7c2
User & Date: scstarkey 2025-02-17 22:00:17
Context
2025-02-17
22:23
Creating secure passphrase file on the client side, too check-in: e6f0cb846e user: scstarkey tags: trunk
22:00
Proper encryption between client and server for authenticated messages check-in: 80fa8b626e user: scstarkey tags: trunk
16:02
Bugfix -- anon-allowed no longer clobbers substreams-allowed check-in: f8d067ad91 user: scstarkey tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/streamful/client.clj.

14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
; You should have received a copy of the GNU Affero General Public
; License along with streamful.
; If not, see <https://www.gnu.org/licenses/#AGPL>.
;

(ns streamful.client
  (:require [clj-cbor.core :as cbor]
            [clj-cbor.tags.content :as tags.content]
            [clj-cbor.tags.numbers :as tags.num]
            [clj-cbor.tags.text :as tags.text]
            [clj-cbor.tags.time :as tags.time]
            [clojure.walk :as walk]
            [streamful.crypto :as cr]
            [streamful.io :as sio]
            [streamful.time :as time]

            [taoensso.timbre :as log])
  (:import (java.io OutputStream)
           (java.net Socket)))

(def ^:private write-handlers
  (merge tags.content/content-write-handlers
         tags.num/number-write-handlers
         tags.time/epoch-time-write-handlers
         tags.time/epoch-date-write-handlers
         tags.text/text-write-handlers))

(def ^:private read-handlers
  (merge tags.content/content-read-handlers
         tags.num/number-read-handlers
         tags.time/instant-read-handlers
         tags.time/local-date-read-handlers
         tags.text/text-read-handlers))

(def ^:private cbor-codec
  (cbor/cbor-codec
    :write-handlers write-handlers
    :read-handlers read-handlers))

(defn send-string-and-receive! [^OutputStream out
                                read-line!
                                ^String message]
  (log/debug "send-string-and-receive!" message)
  (sio/send-string-streaming! out message)
  (let [result (read-line!)]
    (log/debug message "received" result)







<
<
<
<
<



>




<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







14
15
16
17
18
19
20





21
22
23
24
25
26
27
28



















29
30
31
32
33
34
35
; You should have received a copy of the GNU Affero General Public
; License along with streamful.
; If not, see <https://www.gnu.org/licenses/#AGPL>.
;

(ns streamful.client
  (:require [clj-cbor.core :as cbor]





            [streamful.crypto :as cr]
            [streamful.io :as sio]
            [streamful.time :as time]
            [streamful.transport :as transport]
            [taoensso.timbre :as log])
  (:import (java.io OutputStream)
           (java.net Socket)))




















(defn send-string-and-receive! [^OutputStream out
                                read-line!
                                ^String message]
  (log/debug "send-string-and-receive!" message)
  (sio/send-string-streaming! out message)
  (let [result (read-line!)]
    (log/debug message "received" result)
68
69
70
71
72
73
74




75


76
77

78






79


80
81
82
83
84
85
86
87
88
89
90
91
92

93



94
95
96
97
98
99
100
                                     read-line!
                                     read-bytes!
                                     ^bytes bytes-to-send]
  (let [bytes-to-read (send-bytes-and-receive! out read-line! bytes-to-send)
        bytes-to-read (Integer/parseInt bytes-to-read)]
    (read-bytes! bytes-to-read)))





(defn send-cmd! [out-stream read-line! read-bytes! msg private-key]


  (let [msg
        (->> msg walk/stringify-keys (cbor/encode cbor-codec))








        msg (if private-key (cr/sign msg private-key) msg)



        byte-count (count msg)

        cmd-resp
        (send-string-and-receive!
          out-stream
          read-line!
          (format (if private-key "signed %s" "send %s") byte-count))

        result
        (if (= "GO" cmd-resp)
          (send-bytes-and-receive-bytes! out-stream read-line! read-bytes! msg)
          (throw (ex-info "Invalid command response" {:response cmd-resp})))]

    (cbor/decode result)))




(defprotocol Client
  (ping [_ msg] "Ping the server. Msg is what the server should echo back")
  (as-stream
    [_ client-keys]
    "Establishes a secure communications session between client and server.
     The client-keys must be a map containing :public-key and :private-key







>
>
>
>
|
>
>
|
|
>

>
>
>
>
>
>
|
>
>







|




|
>
|
>
>
>







45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
                                     read-line!
                                     read-bytes!
                                     ^bytes bytes-to-send]
  (let [bytes-to-read (send-bytes-and-receive! out read-line! bytes-to-send)
        bytes-to-read (Integer/parseInt bytes-to-read)]
    (read-bytes! bytes-to-read)))

(defn sign-and-encrypt [private-signing-key client-tx msg]
  (let [signed (cr/sign msg private-signing-key)
        encrypted (cr/encrypt signed client-tx)]
    (transport/encode-msg encrypted)))

(defn decrypt [client-rx {:strs [msg nonce] :as response}]
  (if nonce
    (let [decrypted (cr/decrypt nonce msg client-rx)]
      (cbor/decode decrypted))
    response))

(defn send-cmd! [{:keys [private-signing-key client-tx client-rx]}
                 out-stream
                 read-line!
                 read-bytes!
                 msg]
  (let [msg (transport/encode-msg msg)
        msg (if private-signing-key
              (sign-and-encrypt private-signing-key client-tx msg)
              msg)

        byte-count (count msg)

        cmd-resp
        (send-string-and-receive!
          out-stream
          read-line!
          (format (if private-signing-key "signed %s" "send %s") byte-count))

        result
        (if (= "GO" cmd-resp)
          (send-bytes-and-receive-bytes! out-stream read-line! read-bytes! msg)
          (throw (ex-info "Invalid command response" {:response cmd-resp})))

        result (cbor/decode result)]
    (if private-signing-key
      (decrypt client-rx result)
      result)))

(defprotocol Client
  (ping [_ msg] "Ping the server. Msg is what the server should echo back")
  (as-stream
    [_ client-keys]
    "Establishes a secure communications session between client and server.
     The client-keys must be a map containing :public-key and :private-key
144
145
146
147
148
149
150
151
152
153
154

155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180

181
182
183
184
185
186
187
188
189
190
191
192
193











194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238





239



240



241

242
243
244
245
246
247
248
            (dosync
              (ref-set sock new-socket)
              (ref-set in-stream new-in)
              (ref-set out-stream new-out))
            (@listener)))

        send-cmd!
        (fn [msg & [private-key public-key]]
          (try
            {:response
             (send-cmd! @out-stream

                        read-line!
                        read-bytes!
                        (merge
                          msg
                          {:ct (time/now)}
                          (when public-key {:k (cr/b64-str public-key)}))
                        private-key)}
            (catch Throwable t
              (log/error "Failed to send command" t)
              (close!)
              {:response :err
               :reason (.getMessage t)})))]
    {:is-open? is-open?
     :read-line! read-line!

     :send-cmd!
     (fn [msg & [private-key public-key]]
       (when-not (is-open?) (open!))
       (send-cmd! msg private-key public-key))

     :send-key!
     (fn [key-id]
       (try
         (when-not (is-open?) (open!))
         (let [result
               (send-string-and-receive! @out-stream

                                         read-line!
                                         (str "key " (cr/b64-str key-id)))]

           {:response result})
         (catch Throwable t
           (log/error "Failed to send key" t)
           (close!)
           {:response :err
            :reason (.getMessage t)})))

     :close! close!
     :reg-listener! (fn [new-listener] (reset! listener new-listener))}))












(defn new-client
  "Creates a client object that opens a server connection whenever it needs to
   and keeps it open as long as it can. If any error occurs, the connection is
   closed automatically."
  [^String host, ^Integer port, ^Integer timeout & {:keys [key-mode]}]
  (let [autosend-keys? (not= key-mode :no-autosend)

        session (atom {})

        {:keys [send-cmd!
                send-key!
                close!
                is-open?
                reg-listener!]}
        (configure-connection
          host
          port
          timeout)

        send-signed!
        (fn [msg]
          (let [{:keys [private-signing-key public-signing-key]} @session]
            (send-cmd! msg
                       private-signing-key
                       public-signing-key)))]
    (when autosend-keys?
      (reg-listener!
        (fn []
          (let [{:keys [public-signing-key]} @session]
            (when public-signing-key
              (let [{:keys [response]} (send-key! public-signing-key)]
                (when-not (= "ok" response)
                  (throw (ex-info "Failed to send public key"
                                  {:response response})))))))))

    (reify Client
      (ping [_ msg] (send-cmd! {:c "ping", :m msg}))

      (get-messages [_ {:keys [sid]}]
        (let [{:keys [private-signing-key]} @session]
          (send-cmd! {:c "get" :m {:sid sid}}
                     private-signing-key)))

      (as-stream [_ {:keys [public-signing-key] :as ks}]
        (cond ks





              (let [result (send-key! public-signing-key)]



                (swap! session merge ks)



                result)


              (empty? @session)
              {:result "already anonymous"}

              :else
              (do
                (reset! session {})







|


|
>





|
|









|

|

|
|



|
>
|
|











>
>
>
>
>
>
>
>
>
>
>




|
<
<
|


|








|
<
<
<
<
<
<
|
|
|
|
|
|
|
|


|


<
|
<

|

>
>
>
>
>
|
>
>
>
|
>
>
>
|
>







140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207


208
209
210
211
212
213
214
215
216
217
218
219
220






221
222
223
224
225
226
227
228
229
230
231
232
233

234

235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
            (dosync
              (ref-set sock new-socket)
              (ref-set in-stream new-in)
              (ref-set out-stream new-out))
            (@listener)))

        send-cmd!
        (fn [{:keys [public-signing-key] :as session} msg]
          (try
            {:response
             (send-cmd! session
                        @out-stream
                        read-line!
                        read-bytes!
                        (merge
                          msg
                          {:ct (time/now)}
                          (when public-signing-key
                            {:k (cr/b64-str public-signing-key)})))}
            (catch Throwable t
              (log/error "Failed to send command" t)
              (close!)
              {:response :err
               :reason (.getMessage t)})))]
    {:is-open? is-open?
     :read-line! read-line!

     :send-cmd!
     (fn [client-keys msg]
       (when-not (is-open?) (open!))
       (send-cmd! client-keys msg))

     :send-keys!
     (fn [pk psk]
       (try
         (when-not (is-open?) (open!))
         (let [result
               (send-string-and-receive!
                 @out-stream
                 read-line!
                 (str "keys " (cr/b64-str pk) " " (cr/b64-str psk)))]

           {:response result})
         (catch Throwable t
           (log/error "Failed to send key" t)
           (close!)
           {:response :err
            :reason (.getMessage t)})))

     :close! close!
     :reg-listener! (fn [new-listener] (reset! listener new-listener))}))

(defn- process-key-response [expected-server-pk {:keys [response]}]
  (let [[_ server-pk-str] (re-matches #"ok ([-A-Za-z0-9+/]+={0,3})" response)]
    (when server-pk-str
      (when-not (= expected-server-pk server-pk-str)
        (throw
          (ex-info
            "Server key not trusted!"
            {:expected expected-server-pk
             :actual server-pk-str}))))
    server-pk-str))

(defn new-client
  "Creates a client object that opens a server connection whenever it needs to
   and keeps it open as long as it can. If any error occurs, the connection is
   closed automatically."
  [^String host, ^Integer port, ^String server-pk, ^Integer timeout]


  (let [session (atom {})

        {:keys [send-cmd!
                send-keys!
                close!
                is-open?
                reg-listener!]}
        (configure-connection
          host
          port
          timeout)

        send-signed! (fn [msg] (send-cmd! @session msg))]






    (reg-listener!
      (fn []
        (let [{:keys [public-signing-key public-key]} @session]
          (when (and public-signing-key public-key)
            (let [result (send-keys! public-key public-signing-key)]
              (when-not (process-key-response server-pk result)
                (throw (ex-info "Failed to send public key"
                                result))))))))

    (reify Client
      (ping [_ msg] (send-cmd! nil {:c "ping", :m msg}))

      (get-messages [_ {:keys [sid]}]

        (send-signed! {:c "get" :m {:sid sid}}))


      (as-stream [_ {:keys [public-key private-key public-signing-key] :as ks}]
        (cond ks
              (let [response (send-keys! public-key public-signing-key)
                    server-pk-str (process-key-response server-pk response)]
                (if server-pk-str
                  (let [session-keypair
                        (cr/client-session-keypair
                          public-key
                          private-key
                          (cr/b64-str->bytes server-pk-str))]
                    (reset!
                      session
                      (-> ks
                          (merge session-keypair)
                          (assoc :server-pk-str server-pk-str)))
                    {:result "ok"})
                  {:result "failed", :response response}))

              (empty? @session)
              {:result "already anonymous"}

              :else
              (do
                (reset! session {})

Changes to src/streamful/core.clj.

16
17
18
19
20
21
22

23
24
25
26
27
28
29
30
31
32
33



34
35
36
37
38
39
40
; If not, see <https://www.gnu.org/licenses/#AGPL>.
;

(ns streamful.core
  (:gen-class)
  (:require [clojure.java.io :as io]
            [defenv.core :as env]

            [streamful.env :as senv]
            [streamful.protocol :as proto]
            [streamful.simple-test-handler :as sth]
            [streamful.server.netty :as netty]
            [streamful.server.socket :as socket]
            [streamful.slf4j :as slf4j]
            [streamful.stream :as stream]
            [streamful.stream-datalevin :as sdl]
            [streamful.temp :as temp]
            [taoensso.timbre :as log])
  (:import (java.time Duration)



           (java.time.temporal ChronoUnit)))

(def copyright-resource (io/resource "copyright.txt"))

(def copyright-text (slurp copyright-resource))

(defn copyright []







>










|
>
>
>







16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
; If not, see <https://www.gnu.org/licenses/#AGPL>.
;

(ns streamful.core
  (:gen-class)
  (:require [clojure.java.io :as io]
            [defenv.core :as env]
            [streamful.crypto :as cr]
            [streamful.env :as senv]
            [streamful.protocol :as proto]
            [streamful.simple-test-handler :as sth]
            [streamful.server.netty :as netty]
            [streamful.server.socket :as socket]
            [streamful.slf4j :as slf4j]
            [streamful.stream :as stream]
            [streamful.stream-datalevin :as sdl]
            [streamful.temp :as temp]
            [taoensso.timbre :as log])
  (:import (java.io File)
           (java.nio.file Files Paths)
           (java.nio.file.attribute FileAttribute PosixFilePermissions)
           (java.time Duration)
           (java.time.temporal ChronoUnit)))

(def copyright-resource (io/resource "copyright.txt"))

(def copyright-text (slurp copyright-resource))

(defn copyright []
98
99
100
101
102
103
104
105
106
107
108

109
110
111
112












113
114
115
116
117
118
119
120
121
122
123
124
125
126
127


128
129
130
131



132
133
134
135
136
137
138
139


140
141
142
143
144
145
      (if (= "COPYRIGHT" sid)
        {:status :ok
         :messages [{"m" copyright-text}]}
        (stream/get-messages! db-stream-model pk msg original-msg-bytes)))
    (put-message! [_, pk, msg, original-msg-bytes]
      (stream/put-message! db-stream-model pk msg original-msg-bytes))))

(defn build-protocol-handler [db-loc session-timeout]
  (let [db-stream-model (sdl/build-stream-model db-loc)
        stream-model (wrap-stream-model db-stream-model)]
    (proto/protocol-handler

      {:stream-model stream-model
       :session-timeout (-> session-timeout
                            (Duration/of ChronoUnit/MINUTES)
                            (.toMillis))})))













(defn -main
  "Main entrypoint for the Streamful application"
  [& _]
  (init "* Load the COPYRIGHT stream for more details                     *")
  (env/display-env)
  (temp/set-temp-dir-root! @senv/temp-dir-root)
  (temp/cleanup-all-tempdirs!)
  (log/set-min-level! @senv/log-level)

  (let [server-running (atom true)
        stop-server-atom (atom nil)
        start-fn!
        (if (= :socket @senv/server-mode) start-socket! start-netty!)



        stop-server
        (start-fn!
          (if (= :streamful @senv/protocol)
            (build-protocol-handler @senv/db-loc @senv/session-timeout)



            (sth/build-handler stop-server-atom @senv/timeout))
          (fn []
            (log/infof
              "You should be able to connect to port %s"
              @senv/port))
          (fn []
            (log/info "Server is stopping")
            (reset! server-running false)))]


    (future (cleanup-loop server-running))
    (reset! stop-server-atom stop-server)
    (while @server-running (Thread/sleep 1000))
    (temp/cleanup-all-tempdirs!)
    (shutdown-agents)
    (log/info "Thanks for playing!")))







|



>
|



>
>
>
>
>
>
>
>
>
>
>
>















>
>



|
>
>
>








>
>






102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
      (if (= "COPYRIGHT" sid)
        {:status :ok
         :messages [{"m" copyright-text}]}
        (stream/get-messages! db-stream-model pk msg original-msg-bytes)))
    (put-message! [_, pk, msg, original-msg-bytes]
      (stream/put-message! db-stream-model pk msg original-msg-bytes))))

(defn build-protocol-handler [server-keys db-loc session-timeout]
  (let [db-stream-model (sdl/build-stream-model db-loc)
        stream-model (wrap-stream-model db-stream-model)]
    (proto/protocol-handler
      {:server-keys server-keys
       :stream-model stream-model
       :session-timeout (-> session-timeout
                            (Duration/of ChronoUnit/MINUTES)
                            (.toMillis))})))

(defn prepare-server-keys! [^String key-file-name]
  (let [^File key-file (io/file key-file-name)]
    (when-not (.exists key-file)
      (let [permissions (PosixFilePermissions/fromString "rw-------")
            permissions (PosixFilePermissions/asFileAttribute permissions)]
        (log/infof "Generating server key file '%s'" key-file)
        (Files/createFile
          (.toPath key-file)
          (into-array FileAttribute [permissions]))
        (spit key-file (cr/passphrase))))
    (-> key-file slurp cr/new-keys)))

(defn -main
  "Main entrypoint for the Streamful application"
  [& _]
  (init "* Load the COPYRIGHT stream for more details                     *")
  (env/display-env)
  (temp/set-temp-dir-root! @senv/temp-dir-root)
  (temp/cleanup-all-tempdirs!)
  (log/set-min-level! @senv/log-level)

  (let [server-running (atom true)
        stop-server-atom (atom nil)
        start-fn!
        (if (= :socket @senv/server-mode) start-socket! start-netty!)

        server-keys (prepare-server-keys! @senv/server-key-file)

        stop-server
        (start-fn!
          (if (= :streamful @senv/protocol)
            (build-protocol-handler
              server-keys
              @senv/db-loc
              @senv/session-timeout)
            (sth/build-handler stop-server-atom @senv/timeout))
          (fn []
            (log/infof
              "You should be able to connect to port %s"
              @senv/port))
          (fn []
            (log/info "Server is stopping")
            (reset! server-running false)))]
    (log/infof "Server public key: '%s'"
               (cr/b64-str (:public-key server-keys)))
    (future (cleanup-loop server-running))
    (reset! stop-server-atom stop-server)
    (while @server-running (Thread/sleep 1000))
    (temp/cleanup-all-tempdirs!)
    (shutdown-agents)
    (log/info "Thanks for playing!")))

Changes to src/streamful/crypto.clj.

200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
        (ckx/server-session-keys server-public-key
                                 (server-private-key)
                                 client-public-key)]
    {:server-tx (constantly server-tx)
     :server-rx (constantly server-rx)}))

(defn client-session-keypair [client-public-key
                              client-secret-key
                              server-public-key]
  (let [{:keys [client-tx client-rx]}
        (ckx/client-session-keys client-public-key
                                 (client-secret-key)
                                 server-public-key)]
    {:client-tx (constantly client-tx)
     :client-rx (constantly client-rx)}))

;; # Helpful utilities

(defn guid







|



|







200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
        (ckx/server-session-keys server-public-key
                                 (server-private-key)
                                 client-public-key)]
    {:server-tx (constantly server-tx)
     :server-rx (constantly server-rx)}))

(defn client-session-keypair [client-public-key
                              client-private-key
                              server-public-key]
  (let [{:keys [client-tx client-rx]}
        (ckx/client-session-keys client-public-key
                                 (client-private-key)
                                 server-public-key)]
    {:client-tx (constantly client-tx)
     :client-rx (constantly client-rx)}))

;; # Helpful utilities

(defn guid

Changes to src/streamful/env.clj.

13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104







;
; You should have received a copy of the GNU Affero General Public
; License along with streamful.
; If not, see <https://www.gnu.org/licenses/#AGPL>.
;

(ns streamful.env
        (:require [clojure.java.io :as io]
                  [clojure.string :as str]
                  [defenv.core :refer [defenv]])
        (:import (java.time Duration)
                 (java.time.temporal ChronoUnit)))

(def default-server-threads (* (.availableProcessors (Runtime/getRuntime)) 10))

(defenv port
        :doc "Port to listen on"
        :default "1100"
        :masked? false
        :tfn parse-long)

(defenv log-level
        :doc "Level to log messages at"
        :default "info"
        :masked? false
        :tfn (comp keyword str/lower-case))

(defenv timeout
        :doc "How long until we give up waiting for bytes (ms)"
        :default "60000"
        :masked? false
        :tfn parse-long)

(defenv accept-timeout
        :doc "How long until we give up waiting for a socket connection (ms)"
        :default "5000"
        :masked? false
        :tfn parse-long)

(defenv session-timeout
        :doc "How long until a connection loses its session data? (minutes)"
        :default "60"
        :masked? false
        :tfn parse-long)

(defenv max-threads
        :doc "Maximum number of socket server threads"
        :default (str default-server-threads)
        :masked? false
        :tfn parse-long)

(defenv temp-dir-root
        :doc "Root of temporary file storage"
        :default "/tmp/streamful"
        :masked? false)

(defenv server-mode
        :doc "Can either be 'socket' or 'netty'"
        :default "netty"
        :masked? false
        :tfn keyword)

(defenv temp-cleanup-period
        :doc "How old are temp files allowed to get? (seconds)"
        :default (-> (Duration/of 5 ChronoUnit/MINUTES) .toSeconds str)
        :masked? false
        :tfn parse-long)

(defenv file-buffer-bytes
        :doc "At what size do we cache bytes to a file? (netty-only, bytes)"
        :default (str 1048576)
        :masked? false
        :tfn parse-long)

(defenv file-channel-chunk-size
        :doc "Bytes to read into memory at a time with file buffer (netty-only)"
        :default (str 1048576)
        :masked? false
        :tfn parse-long)

(defenv protocol
        :doc "Server protocol. Allowed modes: streamful, echo"
        :default "streamful"
        :masked? false
        :tfn keyword)

(defenv db-loc
        :doc "Location of database"
        :default (->> ".streamful"
                      (io/file (System/getenv "HOME"))
                      .getAbsolutePath)
        :masked? false)














|
|
|
|
|




|
|
|
|


|
|
|
|


|
|
|
|


|
|
|
|


|
|
|
|


|
|
|
|


|
|
|


|
|
|
|


|
|
|
|


|
|
|
|


|
|
|
|


|
|
|
|


|
|
|
|
|
>
>
>
>
>
>
>
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
;
; You should have received a copy of the GNU Affero General Public
; License along with streamful.
; If not, see <https://www.gnu.org/licenses/#AGPL>.
;

(ns streamful.env
  (:require [clojure.java.io :as io]
            [clojure.string :as str]
            [defenv.core :refer [defenv]])
  (:import (java.time Duration)
           (java.time.temporal ChronoUnit)))

(def default-server-threads (* (.availableProcessors (Runtime/getRuntime)) 10))

(defenv port
  :doc "Port to listen on"
  :default "1100"
  :masked? false
  :tfn parse-long)

(defenv log-level
  :doc "Level to log messages at"
  :default "info"
  :masked? false
  :tfn (comp keyword str/lower-case))

(defenv timeout
  :doc "How long until we give up waiting for bytes (ms)"
  :default "60000"
  :masked? false
  :tfn parse-long)

(defenv accept-timeout
  :doc "How long until we give up waiting for a socket connection (ms)"
  :default "5000"
  :masked? false
  :tfn parse-long)

(defenv session-timeout
  :doc "How long until a connection loses its session data? (minutes)"
  :default "60"
  :masked? false
  :tfn parse-long)

(defenv max-threads
  :doc "Maximum number of socket server threads"
  :default (str default-server-threads)
  :masked? false
  :tfn parse-long)

(defenv temp-dir-root
  :doc "Root of temporary file storage"
  :default "/tmp/streamful"
  :masked? false)

(defenv server-mode
  :doc "Can either be 'socket' or 'netty'"
  :default "netty"
  :masked? false
  :tfn keyword)

(defenv temp-cleanup-period
  :doc "How old are temp files allowed to get? (seconds)"
  :default (-> (Duration/of 5 ChronoUnit/MINUTES) .toSeconds str)
  :masked? false
  :tfn parse-long)

(defenv file-buffer-bytes
  :doc "At what size do we cache bytes to a file? (netty-only, bytes)"
  :default (str 1048576)
  :masked? false
  :tfn parse-long)

(defenv file-channel-chunk-size
  :doc "Bytes to read into memory at a time with file buffer (netty-only)"
  :default (str 1048576)
  :masked? false
  :tfn parse-long)

(defenv protocol
  :doc "Server protocol. Allowed modes: streamful, echo"
  :default "streamful"
  :masked? false
  :tfn keyword)

(defenv db-loc
  :doc "Location of database"
  :default (->> ".streamful"
                (io/file (System/getenv "HOME"))
                .getAbsolutePath)
  :masked? false)

(defenv server-key-file
  :doc "The file that contains the key that the server uses for comms"
  :default (->> ".streamful-server-key"
                (io/file (System/getenv "HOME"))
                .getAbsolutePath)
  :masked? false)

Changes to src/streamful/protocol.clj.

17
18
19
20
21
22
23

24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79








80

81
82


83


84




















85
86
87

88
89
90
91
92
93

94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122













123


124
125


126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146




147
148
149
150


151
152

153
154

155

156


157
158



159

160
161

162
163



164
165
166
167
168

169
170
171








172
173
174
175
176
177
178
;

(ns streamful.protocol
  (:require [clj-cbor.core :as cbor]
            [clojure.core.cache.wrapped :as cache]
            [streamful.crypto :as cr]
            [streamful.stream :as stream]

            [taoensso.timbre :as log])
  (:import (clojure.lang ExceptionInfo)
           (java.io ByteArrayInputStream)))

(def ok-response "ok")
(def unauthorized-response "unauthorized")
(def signature-required-response "signature required")
(def key-missing-response "key missing")
(def already-exists-response "already exists")
(def root-missing-response "root not yet configured -- register that first")
(def stream-missing-response "stream not found")
(def substreams-not-allowed-response "substreams aren't allowed here")
(def failed-response "failed")

(defmacro defn-authd [n a & b]
  `(defn ~n ~a
     (if ~(first a)
       (do ~@b)
       (do (log/debug (str "Attempted to " ~n " without a key"))
           ~signature-required-response))))

(defn-authd register-stream! [pk stream-model msg original]
  (let [result (stream/register-stream! stream-model pk msg original)]
    (case result
      :ok ok-response
      :root-stream-missing root-missing-response
      :already-exists already-exists-response
      :substreams-not-allowed substreams-not-allowed-response
      (do (log/warn "Failed to register stream" result)
          failed-response))))

(defn-authd configure-stream! [pk stream-model msg original]
  (let [result (stream/configure-stream! stream-model pk msg original)]
    (case result
      :ok ok-response
      :unauthorized unauthorized-response
      (do (log/warn "Failed to configure stream" result)
          failed-response))))

(defn get-messages [pk stream-model cmd original]
  (let [{:keys [status messages] :as result}
        (stream/get-messages! stream-model pk cmd original)]
    (case status
      :ok {"status" ok-response, "messages" messages}
      :stream-missing {"status" stream-missing-response}
      (do (log/warn "Failed to get messages" result)
          failed-response))))

(defn-authd put-message! [pk stream-model cmd original]
  (let [result (stream/put-message! stream-model pk cmd original)]
    (case result
      :ok ok-response
      :unauthorized unauthorized-response
      (do (log/warn "Failed to configure stream" result)
          failed-response))))









(defn verified [signed? pk original]

  (if signed?
    (when pk (cr/verified original pk))


    original))























(defn- handle-message
  ([signed? pk req msg]
   (cbor/encode

     (try
       (let [original (.readAllBytes msg)
             msg (verified signed? pk original)]
         (if msg
           (some->> (cbor/decode msg)
                    (handle-message

                      (assoc req
                        :pk (when signed? pk)
                        :original original)))
           key-missing-response))
       (catch ExceptionInfo e
         (if (= {:reason :verification-failed} (ex-data e))
           (do
             (log/debug e {:pk (cr/b64-str pk)})
             unauthorized-response)
           (throw e))))))

  ([{:keys [stream-model pk original] {:keys [remote-ip session-id]} :req}
    {:strs [c m] :as cmd}]
   (case c
     "ping" m
     "get" (get-messages pk stream-model cmd original)
     "put" (put-message! pk stream-model cmd original)
     "reg" (register-stream! pk stream-model cmd original)
     "cfg" (configure-stream! pk stream-model cmd original)
     (throw (ex-info "Unknown command"
                     {:cmd cmd
                      :session-id session-id
                      :remote-ip remote-ip})))))

(defn- update-session-state! [session-state session-id req-state f]
  (let [new-state (f req-state)]
    (cache/evict session-state session-id)
    (cache/lookup-or-miss session-state session-id (constantly new-state))))














(defn- handle-cmd [update-state! msg]


  (let [[_ bytes-to-receive] (re-matches #"send (\d+)" msg)
        [_ signed-bytes-to-receive] (re-matches #"signed (\d+)" msg)


        [_ key-str] (re-matches #"key ([-A-Za-z0-9+/]+={0,3})" msg)

        bytes-to-receive
        (when bytes-to-receive
          (Integer/parseInt bytes-to-receive))

        bytes-to-receive
        (if signed-bytes-to-receive
          (Integer/parseInt signed-bytes-to-receive)
          bytes-to-receive)]
    (log/trace "CMD" bytes-to-receive key-str)
    (if signed-bytes-to-receive
      (update-state! (fn [m] (assoc m :signed? true)))
      (update-state! (fn [m] (dissoc m :signed?))))

    (cond bytes-to-receive
          [:bytes [bytes-to-receive "GO"]]

          key-str
          (do
            (update-state!




              (fn [m] (assoc m :pk (cr/b64-str->bytes key-str))))
            [:cmd ok-response]))))

(defn- handle-bytes [stream-model {:keys [pk signed?]} {:keys [msg] :as req}]


  [:cmd
   (let [response

         (handle-message signed?
                         pk

                         {:req req, :stream-model stream-model}

                         msg)]


     [(count response) (ByteArrayInputStream. response)])])




(defn- handler [stream-model

                session-state
                {:keys [mode msg session-id] :as req}]

  (let [req-state
        (cache/lookup-or-miss session-state session-id (constantly {}))]



    (log/trace mode msg session-id req-state)
    (cond
      (= :cmd mode)
      (handle-cmd
        (partial update-session-state! session-state session-id req-state)

        msg)
      (= :bytes mode) (handle-bytes stream-model req-state req)
      :else (log/error "Invalid request" req))))









(defn protocol-handler
  "The protocol is fairly simple. Connect to the server and keep the
   connection open. For every command you want to send, first CBOR encode
   a single object with at least one key. Then, send the string 'send {n}'
   where `n` is the length of the encoded object. The server will respond
   with 'GO'. If you got that, then send the encoded object directly.







>






|














|
|








|
|






|
<
<
<
<
<
<
<
<
<
|






>
>
>
>
>
>
>
>
|
>

|
>
>
|
>
>

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
|
<
>
|
|
|
|
|
<
>
|
|
|
|
|
|
|
|
|
|
|
<
<
<
<
<
<
<
<
<
|
<
<






>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>


>
>
|

|
<
<


<
<
|
|







|


>
>
>
>
|
|

|
>
>

|
>
|
<
>
|
>
|
>
>
|

>
>
>
|
>


>
|
|
>
>
>
|
|
|
|
|
>
|
|
|
>
>
>
>
>
>
>
>







17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64









65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111

112
113
114
115
116
117

118
119
120
121
122
123
124
125
126
127
128
129









130


131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159


160
161


162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187

188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
;

(ns streamful.protocol
  (:require [clj-cbor.core :as cbor]
            [clojure.core.cache.wrapped :as cache]
            [streamful.crypto :as cr]
            [streamful.stream :as stream]
            [streamful.transport :as transport]
            [taoensso.timbre :as log])
  (:import (clojure.lang ExceptionInfo)
           (java.io ByteArrayInputStream)))

(def ok-response "ok")
(def unauthorized-response "unauthorized")
(def signature-required-response "authentication required")
(def key-missing-response "key missing")
(def already-exists-response "already exists")
(def root-missing-response "root not yet configured -- register that first")
(def stream-missing-response "stream not found")
(def substreams-not-allowed-response "substreams aren't allowed here")
(def failed-response "failed")

(defmacro defn-authd [n a & b]
  `(defn ~n ~a
     (if ~(first a)
       (do ~@b)
       (do (log/debug (str "Attempted to " ~n " without a key"))
           ~signature-required-response))))

(defn-authd register-stream! [psk stream-model msg original]
  (let [result (stream/register-stream! stream-model psk msg original)]
    (case result
      :ok ok-response
      :root-stream-missing root-missing-response
      :already-exists already-exists-response
      :substreams-not-allowed substreams-not-allowed-response
      (do (log/warn "Failed to register stream" result)
          failed-response))))

(defn-authd configure-stream! [psk stream-model msg original]
  (let [result (stream/configure-stream! stream-model psk msg original)]
    (case result
      :ok ok-response
      :unauthorized unauthorized-response
      (do (log/warn "Failed to configure stream" result)
          failed-response))))

(defn-authd put-message! [psk stream-model cmd original]









  (let [result (stream/put-message! stream-model psk cmd original)]
    (case result
      :ok ok-response
      :unauthorized unauthorized-response
      (do (log/warn "Failed to configure stream" result)
          failed-response))))

(defn get-messages [psk stream-model cmd original]
  (let [{:keys [status messages] :as result}
        (stream/get-messages! stream-model psk cmd original)]
    (case status
      :ok {"status" ok-response, "messages" messages}
      :stream-missing {"status" stream-missing-response}
      (do (log/warn "Failed to get messages" result)
          failed-response))))

(defn- verified [{:keys [signed? psk server-rx]} original]
  (if signed?
    (when psk
      (let [{:strs [msg nonce]} (cbor/decode original)
            msg (cr/decrypt nonce msg server-rx)]
        {:original msg
         :msg (cr/verified msg psk)}))
    {:original original, :msg original}))

(defn- encrypt-signed [{:keys [signed? psk server-tx]} response]
  (if (and signed? psk)
    (let [msg (cr/encrypt response server-tx)]
      (transport/encode-msg msg))
    response))

(defn- router [stream-model]
  (fn [{:keys [psk original] {:keys [remote-ip session-id]} :req}
       {:strs [c m] :as cmd}]
    (case c
      "ping" m
      "get" (get-messages psk stream-model cmd original)
      "put" (put-message! psk stream-model cmd original)
      "reg" (register-stream! psk stream-model cmd original)
      "cfg" (configure-stream! psk stream-model cmd original)
      (throw (ex-info "Unknown command"
                      {:cmd cmd
                       :session-id session-id
                       :remote-ip remote-ip})))))

(defn handle-message [router
                      {:keys [signed? psk] :as req-state}

                      {:keys [msg] :as req}]
  (->> (try
         (let [original (.readAllBytes msg)
               {:keys [original msg]} (verified req-state original)]
           (if msg
             (some->> (cbor/decode msg)

                      (router
                        (assoc req
                          :psk (when signed? psk)
                          :original original)))
             key-missing-response))
         (catch ExceptionInfo e
           (if (= {:reason :verification-failed} (ex-data e))
             (do
               (log/debug e {:psk (cr/b64-str psk)})
               unauthorized-response)
             (throw e))))
       cbor/encode









       (encrypt-signed req-state)))



(defn- update-session-state! [session-state session-id req-state f]
  (let [new-state (f req-state)]
    (cache/evict session-state session-id)
    (cache/lookup-or-miss session-state session-id (constantly new-state))))

(defn with-session-keys [server-public-key
                         server-private-key
                         client-public-signing-key-str
                         client-public-key-str
                         m]
  (-> m
      (assoc :psk (cr/b64-str->bytes client-public-signing-key-str))
      (merge
        (cr/server-session-keypair
          server-public-key
          server-private-key
          (cr/b64-str->bytes client-public-key-str)))))

(defn- handle-cmd [update-state!
                   {:keys [encoded-pk private-key public-key]}
                   msg]
  (let [[_ bytes-to-receive] (re-matches #"send (\d+)" msg)
        [_ signed-bytes-to-receive] (re-matches #"signed (\d+)" msg)

        [_ pk-str psk-str]
        (re-matches #"keys ([-A-Za-z0-9+/]+={0,3}) ([-A-Za-z0-9+/]+={0,3})" msg)

        bytes-to-receive (or bytes-to-receive signed-bytes-to-receive)



        bytes-to-receive


        (when bytes-to-receive (Integer/parseInt bytes-to-receive))]
    (log/trace "CMD" bytes-to-receive pk-str psk-str)
    (if signed-bytes-to-receive
      (update-state! (fn [m] (assoc m :signed? true)))
      (update-state! (fn [m] (dissoc m :signed?))))

    (cond bytes-to-receive
          [:bytes [bytes-to-receive "GO"]]

          (and pk-str psk-str)
          (do
            (update-state!
              (partial with-session-keys
                       public-key
                       private-key
                       psk-str
                       pk-str))
            [:cmd (str ok-response " " encoded-pk)]))))

(defn- handle-bytes [router
                     req-state
                     req]
  [:cmd
   (let [response (handle-message router req-state req)]
     [(count response) (ByteArrayInputStream. response)])])


(defn- send-error [mode reason]
  (case mode
    :bytes
    [:cmd
     (let [encoded
           (transport/encode-msg reason)]
       [(count encoded) (ByteArrayInputStream. encoded)])]

    :cmd
    [:cmd reason]))

(defn- handler [server-keys
                router
                session-state
                {:keys [mode msg session-id] :as req}]
  (try
    (let [req-state
          (cache/lookup-or-miss session-state session-id (constantly {}))

          encoded-server-pk (cr/b64-str (:public-key server-keys))
          key-info (assoc server-keys :encoded-pk encoded-server-pk)]
      (log/trace mode msg session-id req-state)
      (cond
        (= :cmd mode)
        (handle-cmd
          (partial update-session-state! session-state session-id req-state)
          key-info
          msg)
        (= :bytes mode) (handle-bytes router req-state req)
        :else (log/error "Invalid request" req)))
    (catch ExceptionInfo e
      (let [{:keys [reason] :as d} (ex-data e)]
        (when-not (= :decryption-failed reason)
          (log/error e))
        (send-error mode (pr-str d))))
    (catch Throwable t
      (log/error t)
      (send-error mode "Unexpected failure"))))

(defn protocol-handler
  "The protocol is fairly simple. Connect to the server and keep the
   connection open. For every command you want to send, first CBOR encode
   a single object with at least one key. Then, send the string 'send {n}'
   where `n` is the length of the encoded object. The server will respond
   with 'GO'. If you got that, then send the encoded object directly.
207
208
209
210
211
212
213
214
215

216

217
218
   Once you have executed `key`, then you can add one more parameter to your
   `sign` command: `send {n} signed`

   The assumption, then, is that the message you are about to send is
   cryptographically signed using the libsodium `crypto_sign` primitive.
   You can learn more at <https://libsodium.gitbook.io/doc>
   "
  [{:keys [^stream/StreamModel stream-model, session-timeout]
    :or {session-timeout 1000}}]

  (partial handler

           stream-model
           (cache/ttl-cache-factory {} :ttl session-timeout)))







|

>
|
>
|
|
261
262
263
264
265
266
267
268
269
270
271
272
273
274
   Once you have executed `key`, then you can add one more parameter to your
   `sign` command: `send {n} signed`

   The assumption, then, is that the message you are about to send is
   cryptographically signed using the libsodium `crypto_sign` primitive.
   You can learn more at <https://libsodium.gitbook.io/doc>
   "
  [{:keys [^stream/StreamModel stream-model, session-timeout, server-keys]
    :or {session-timeout 1000}}]
  (let [router (router stream-model)]
    (partial handler
             server-keys
             router
             (cache/ttl-cache-factory {} :ttl session-timeout))))

Added src/streamful/transport.clj.































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
;
; This file is part of streamful.
;
; streamful is free software: you can redistribute it and/or modify it
; under the terms ofthe GNU Affero General Public License as published
; by the Free Software Foundation, either version 3 of the License, or
; (at your option) any later version.
;
; streamful is distributed in the hope that it will be useful, but
; WITHOUT ANY WARRANTY; without even the implied warranty of
; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
; GNU Affero General Public License for more details.
;
; You should have received a copy of the GNU Affero General Public
; License along with streamful.
; If not, see <https://www.gnu.org/licenses/#AGPL>.
;

(ns streamful.transport
  (:require [clj-cbor.core :as cbor]
            [clj-cbor.tags.content :as tags.content]
            [clj-cbor.tags.numbers :as tags.num]
            [clj-cbor.tags.text :as tags.text]
            [clj-cbor.tags.time :as tags.time]
            [clojure.walk :as walk]))

(def ^:private write-handlers
  (merge tags.content/content-write-handlers
         tags.num/number-write-handlers
         tags.time/epoch-time-write-handlers
         tags.time/epoch-date-write-handlers
         tags.text/text-write-handlers))

(def ^:private read-handlers
  (merge tags.content/content-read-handlers
         tags.num/number-read-handlers
         tags.time/instant-read-handlers
         tags.time/local-date-read-handlers
         tags.text/text-read-handlers))

(def ^:private cbor-codec
  (cbor/cbor-codec
    :write-handlers write-handlers
    :read-handlers read-handlers))

(defn encode-msg [msg]
  (->> msg walk/stringify-keys (cbor/encode cbor-codec)))

Changes to test/streamful/manual_testing.clj.

56
57
58
59
60
61
62




63
64

65
66
67
68
69
70
71
72
73
74
75








76
77
78
79
80
81
82
(defn- as [c k]
  (if k
    (client/as-stream c k)
    (client/as-stream c nil)))

(comment
  (log/set-min-level! :error)





  (def c (client/new-client "localhost" 1100 1000))

  (def c2 (client/new-client "localhost" 1100 1000 :key-mode :no-autosend))

  (def k (load-keys))

  (save-keys! (new-keys))

  (keys k)

  (-> k :public-signing-key cr/b64-str)

  (as c k)








  (as c nil)
  (as c2 k)
  (as c2 nil)

  (client/register-stream c {:sid "root"})
  (client/configure-stream c {:sid "root" :substreams-allowed? true})
  (client/configure-stream c {:sid "root" :anon-allowed? true})







>
>
>
>

|
>
|










>
>
>
>
>
>
>
>







56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
(defn- as [c k]
  (if k
    (client/as-stream c k)
    (client/as-stream c nil)))

(comment
  (log/set-min-level! :error)
  ;; this appears at server startup
  ;; eventually we will use a TXT record hosted using DNSSEC
  ;; to prove servers can be trusted (no SSL authorities needed!)
  (def server-pk "9G4BzxYJSwGxC6a3FC3wqqa6Chk/cAV9lmgmyvyhgUA=")

  (def c (client/new-client "localhost" 1100 server-pk 1000))
  (def c (client/new-client "localhost" 1100 "BAD" 1000))
  (def c2 (client/new-client "localhost" 1100 server-pk 1000))

  (def k (load-keys))

  (save-keys! (new-keys))

  (keys k)

  (-> k :public-signing-key cr/b64-str)

  (as c k)

  ;; bad client signing key is handled ok by the server
  ;; expect "unauthorized" messages when using them
  (as c (assoc k :private-signing-key (:private-signing-key (new-keys))))

  ;; bad client encryption key handled kind of OK by the server
  (as c (assoc k :private-key (:private-key (new-keys))))

  (as c nil)
  (as c2 k)
  (as c2 nil)

  (client/register-stream c {:sid "root"})
  (client/configure-stream c {:sid "root" :substreams-allowed? true})
  (client/configure-stream c {:sid "root" :anon-allowed? true})

Changes to test/streamful/protocol_test.clj.

22
23
24
25
26
27
28
29


30
31
32
33
34
35
36
            [crypto.equality :as creq]
            [streamful.client :as client]
            [streamful.crypto :as cr]
            [streamful.protocol :refer :all]
            [streamful.server.netty :as server]
            [streamful.test-asserts :as ta]
            [streamful.test-cfg :as tcfg]
            [streamful.time :as time]))



; This protocol is inspired by
; <https://github.com/nostr-protocol/nips/blob/master/01.md>
; except for a few minor differences. Since we are signing an object
; which has been CBOR encoded, we can simply sign the original message
; itself instead of forcing a specific sequence. The signature
; is wrapped using libsodium's envelope, instead of a custom detached







|
>
>







22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
            [crypto.equality :as creq]
            [streamful.client :as client]
            [streamful.crypto :as cr]
            [streamful.protocol :refer :all]
            [streamful.server.netty :as server]
            [streamful.test-asserts :as ta]
            [streamful.test-cfg :as tcfg]
            [streamful.time :as time]
            [streamful.transport :as transport])
  (:import (java.io ByteArrayInputStream)))

; This protocol is inspired by
; <https://github.com/nostr-protocol/nips/blob/master/01.md>
; except for a few minor differences. Since we are signing an object
; which has been CBOR encoded, we can simply sign the original message
; itself instead of forcing a specific sequence. The signature
; is wrapped using libsodium's envelope, instead of a custom detached
49
50
51
52
53
54
55



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
; We also don't worry too much about whether the ID is 'different' between
; events that contain the same content. Our biggest worries are whether hash
; map keys are included in a different order, etc. Considering the client
; timestamp is unlikely to be identical, as in, the same exact command won't
; be generated by 2 different clients using 2 different implementations at
; exactly the same time, ... and really, who cares??




(defn- validate-and-reassemble-msg [{:strs [psk o id]}]
  (let [expected-id (cr/hash-256-b64-str o)

        {:strs [k] :as value}
        (-> o (cr/verified psk) cbor/decode)]
    ;; message hash should match
    (is (= expected-id id))

    ;; public key inside message should match the one we validated against
    (is (creq/eq? psk (cr/b64-str->bytes k)))
    value))

(defn- as [client keys]
  (is (= {:response "ok"} (client/as-stream client keys))))

(defn- anon [client]
  (is (= {:result "keys removed"} (client/as-stream client nil))))

(defn- get-messages [client id] (client/get-messages client {:sid id}))

(defn- get-as [client keys id] (as client keys) (get-messages client id))

(defn- get-anon [client id] (anon client) (get-messages client id))

(defn- get-m [m] (get m "m"))

(tcfg/def-server-test protocol-test
  [client]
  (testing "ping"
    (let [msg-to-echo (cr/guid94)]
      (is (= {:response msg-to-echo}
             (client/ping client msg-to-echo)))))
  (let [cur-now (atom 1)
        keys1 (cr/new-keys)
        keys2 (cr/new-keys)]
    (with-redefs [time/now (fn [] (swap! cur-now inc))]
      (testing "register stream"
        (testing "must sign with the correct keys"
          (is (= {:response "ok"}
                 (client/as-stream
                   client
                   (merge keys1
                          (select-keys keys2 [:public-key
                                              :public-signing-key])))))

          (is (= {:response "unauthorized"}
                 (client/register-stream client {:sid "test1"}))))

        (testing "first stream must be called 'root'"
          (as client keys1)








>
>
>
|

<
|
<








|




|

|

|



|
<









|
|


|
|
|







51
52
53
54
55
56
57
58
59
60
61
62

63

64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
; We also don't worry too much about whether the ID is 'different' between
; events that contain the same content. Our biggest worries are whether hash
; map keys are included in a different order, etc. Considering the client
; timestamp is unlikely to be identical, as in, the same exact command won't
; be generated by 2 different clients using 2 different implementations at
; exactly the same time, ... and really, who cares??

(defn decode-signed-original [psk o]
  (-> o (cr/verified psk) cbor/decode))

(defn- reassemble-original [{:strs [psk o id]}]
  (let [expected-id (cr/hash-256-b64-str o)

        {:strs [k] :as value} (decode-signed-original psk o)]

    ;; message hash should match
    (is (= expected-id id))

    ;; public key inside message should match the one we validated against
    (is (creq/eq? psk (cr/b64-str->bytes k)))
    value))

(defn- as [client keys]
  (is (= {:result "ok"} (client/as-stream client keys))))

(defn- anon [client]
  (is (= {:result "keys removed"} (client/as-stream client nil))))

(defn- get-messages-from [client id] (client/get-messages client {:sid id}))

(defn- get-as [client keys id] (as client keys) (get-messages-from client id))

(defn- get-anon [client id] (anon client) (get-messages-from client id))

(defn- get-m [m] (get m "m"))

(tcfg/def-server-test protocol-test [client]

  (testing "ping"
    (let [msg-to-echo (cr/guid94)]
      (is (= {:response msg-to-echo}
             (client/ping client msg-to-echo)))))
  (let [cur-now (atom 1)
        keys1 (cr/new-keys)
        keys2 (cr/new-keys)]
    (with-redefs [time/now (fn [] (swap! cur-now inc))]
      (testing "register stream"
        (testing "must sign with the correct key"
          (is (= {:result "ok"}
                 (client/as-stream
                   client
                   (assoc keys1
                     :private-signing-key
                     (:private-signing-key keys2)))))

          (is (= {:response "unauthorized"}
                 (client/register-stream client {:sid "test1"}))))

        (testing "first stream must be called 'root'"
          (as client keys1)

196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
                (is (= expected-root-messages
                       (map get-m root-messages)) r1)
                (is (= expected-test1-messages
                       (map get-m test1-messages)) r2))

              (testing "we can validate and reassemble from original binaries"
                (is (= expected-root-messages
                       (map validate-and-reassemble-msg root-messages)))
                (is (= expected-test1-messages
                       (map validate-and-reassemble-msg test1-messages))))))

          (testing "good error message when stream is missing"
            (is (= {:response {"status" "stream not found"}}
                   (get-as client keys1 "missing-stream"))))

          (testing "streams are private by default, show as missing"
            (is (= {:response {"status" "stream not found"}}







|

|







198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
                (is (= expected-root-messages
                       (map get-m root-messages)) r1)
                (is (= expected-test1-messages
                       (map get-m test1-messages)) r2))

              (testing "we can validate and reassemble from original binaries"
                (is (= expected-root-messages
                       (map reassemble-original root-messages)))
                (is (= expected-test1-messages
                       (map reassemble-original test1-messages))))))

          (testing "good error message when stream is missing"
            (is (= {:response {"status" "stream not found"}}
                   (get-as client keys1 "missing-stream"))))

          (testing "streams are private by default, show as missing"
            (is (= {:response {"status" "stream not found"}}
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
                         expected-test1-messages
                         {"c" "put"
                          "m" {"sid" "test1", "params" {"t" "hello, friend!"}}
                          "k" test1-k
                          "ct" 21})
                       (map get-m test1-messages)))))))))

    (testing "signature required"
      (anon client)
      (let [expected-response {:response "signature required"}]
        (testing "registration"
          (is (= expected-response
                 (client/register-stream client {:sid "test1"}))))
        (testing "config"
          (is (= expected-response
                 (client/configure-stream
                   client







|

|







250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
                         expected-test1-messages
                         {"c" "put"
                          "m" {"sid" "test1", "params" {"t" "hello, friend!"}}
                          "k" test1-k
                          "ct" 21})
                       (map get-m test1-messages)))))))))

    (testing "authentication required"
      (anon client)
      (let [expected-response {:response "authentication required"}]
        (testing "registration"
          (is (= expected-response
                 (client/register-stream client {:sid "test1"}))))
        (testing "config"
          (is (= expected-response
                 (client/configure-stream
                   client
292
293
294
295
296
297
298





















































































299
300
301
302
303
304
305
306
307
308
309
310
311
        (is (as-> (get-as client keys1 "test1") x
                  (:response x)
                  (x "status")
                  (= "stream not found" x))))
      (ta/pending "approval required")
      (ta/pending "invitation only"))))






















































































(deftest aggregates
  (ta/pending "server-level")
  (ta/pending "stream-level")
  (ta/pending "message-level"))

(deftest server-side-subscriptions
  ;; Is there a background process that fetches periodically?
  ;; Or should we require clients to request a fetch every time?
  ;; This would allow fetches to be signed every time! And reduce
  ;; load on the server for subscriptions that are rare. We would
  ;; simply fetch all messages into a local stream and anybody
  ;; who subscribes on the same server would have access to them.
  ;;







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|




|







294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
        (is (as-> (get-as client keys1 "test1") x
                  (:response x)
                  (x "status")
                  (= "stream not found" x))))
      (ta/pending "approval required")
      (ta/pending "invitation only"))))

(defn- reassemble-from-handler [{:keys [psk original]}]
  (decode-signed-original psk original))

(deftest handle-message-test
  (let [messages-received (atom [])

        router
        (fn [req decoded]
          (swap! messages-received
                 conj
                 [decoded req])
          "ok")

        server-keys (cr/new-keys)
        client-keys (cr/new-keys)

        client-public-key (:public-key client-keys)
        client-private-key (:private-key client-keys)
        server-public-key (:public-key server-keys)

        partial-signing-state {:signed? true}

        complete-crypto-state
        (with-session-keys
          server-public-key
          (:private-key server-keys)
          (cr/b64-str (:public-signing-key client-keys))
          (cr/b64-str client-public-key)
          partial-signing-state)

        expected-insecure "feeling insecure"
        expected-secure "feeling quite secure"]
    (testing "message handling"
      (testing "unsigned messages are passed straight through"
        (let [msg {:msg expected-insecure}
              msg (transport/encode-msg msg)

              response
              (handle-message router
                              {}
                              {:msg (ByteArrayInputStream. msg)})]
          (is (= "ok" (cbor/decode response)))))

      (testing "server responds in plain text if no keys"
        (let [msg {:msg "no keys!"}
              msg (transport/encode-msg msg)

              response
              (handle-message router
                              partial-signing-state
                              {:msg (ByteArrayInputStream. msg)})]
          (is (= "key missing" (cbor/decode response)))))

      (testing "authd messages must be both signed and encrypted"
        (let [msg {:msg expected-secure}
              msg (transport/encode-msg msg)
              msg (cr/sign msg (:private-signing-key client-keys))

              {:keys [client-tx client-rx]}
              (cr/client-session-keypair client-public-key
                                         client-private-key
                                         server-public-key)

              msg (cr/encrypt msg client-tx)
              msg (transport/encode-msg msg)

              encrypted-response
              (handle-message router
                              complete-crypto-state
                              {:msg (ByteArrayInputStream. msg)})

              {:strs [nonce msg]} (cbor/decode encrypted-response)
              response (cr/decrypt nonce msg client-rx)]
          (is (= "ok" (cbor/decode response)))))
      (is (= [{"msg" expected-insecure}, {"msg" expected-secure}]
             (map first @messages-received)))
      (is (= {"msg" expected-secure}
             (-> @messages-received
                 second
                 second
                 reassemble-from-handler))))))

(deftest server-key-test
  (ta/pending "client properly rejects bad server keys"))

(deftest aggregates-test
  (ta/pending "server-level")
  (ta/pending "stream-level")
  (ta/pending "message-level"))

(deftest server-side-subscriptions-test
  ;; Is there a background process that fetches periodically?
  ;; Or should we require clients to request a fetch every time?
  ;; This would allow fetches to be signed every time! And reduce
  ;; load on the server for subscriptions that are rare. We would
  ;; simply fetch all messages into a local stream and anybody
  ;; who subscribes on the same server would have access to them.
  ;;
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
    ;;
    ;; Which might be a blessing.
    ;;
    ;; They could still get a picture by following everybody I follow.
    (ta/pending "server defaults")
    (ta/pending "per stream"))
  (ta/pending "purge remote stream on last unsubscribe"))

(defmacro simple-reg-get-with-close [m client b & body]
  `(let [client-keys# (cr/new-keys)
         client# ~client]
     (testing ~m
       (as client# client-keys#)
       (is (= {:response "ok"} (client/register-stream client# {:sid "root"})))
       (client/close client#)
       (let [~(first b) (get-messages client# "root")]
         ~@body))))

(tcfg/def-configd-server-test client-autosend-off
  [new-client server/start-netty-server! {}]
  (simple-reg-get-with-close
    "Without autosend the server gives a sane response"
    (new-client :key-mode :no-autosend)
    [response]
    (is (= {:response "key missing"} response))))

(tcfg/def-configd-server-test client-autosend-on-by-default
  [new-client server/start-netty-server! {}]
  (simple-reg-get-with-close "Autosend works properly" (new-client) [response]
    (is (= 1 (-> response :response (get "messages") count)))))







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
419
420
421
422
423
424
425























    ;;
    ;; Which might be a blessing.
    ;;
    ;; They could still get a picture by following everybody I follow.
    (ta/pending "server defaults")
    (ta/pending "per stream"))
  (ta/pending "purge remote stream on last unsubscribe"))























Changes to test/streamful/test_cfg.clj.

12
13
14
15
16
17
18
19
20
21
22

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41


42
43
44
45
46
47
48
; GNU Affero General Public License for more details.
;
; You should have received a copy of the GNU Affero General Public
; License along with streamful.
; If not, see <https://www.gnu.org/licenses/#AGPL>.
;


(ns streamful.test-cfg
  (:require [clojure.test :refer :all]
            [streamful.core :as core]

            [streamful.log-util :as lu]
            [streamful.server.netty :as netty]
            [streamful.temp :as temp]
            [streamful.test-protocol :as tp])
  (:import (java.io File)))

(defmacro def-logcfg-test [n & body]
  `(deftest ~n
     (lu/test-log-level! ~@body)))

(defn build-temp-protocol-handler []
  (let [^File temp-dir (temp/session-tempdir!)
        ^File db-dir (File. temp-dir "db")]
    (core/build-protocol-handler (.getAbsolutePath db-dir) 1)))

(defmacro def-configd-server-test [n b & body]
  `(def-logcfg-test ~n
     (do
       (tp/with-configd-server-and-client (~build-temp-protocol-handler)


         [~@b]
         ~@body)
       (temp/cleanup-all-tempdirs!))))

(defmacro def-server-test
  [n b & body]
  `(def-configd-server-test







<



>










|


|



|
|
>
>







12
13
14
15
16
17
18

19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
; GNU Affero General Public License for more details.
;
; You should have received a copy of the GNU Affero General Public
; License along with streamful.
; If not, see <https://www.gnu.org/licenses/#AGPL>.
;


(ns streamful.test-cfg
  (:require [clojure.test :refer :all]
            [streamful.core :as core]
            [streamful.crypto :as cr]
            [streamful.log-util :as lu]
            [streamful.server.netty :as netty]
            [streamful.temp :as temp]
            [streamful.test-protocol :as tp])
  (:import (java.io File)))

(defmacro def-logcfg-test [n & body]
  `(deftest ~n
     (lu/test-log-level! ~@body)))

(defn build-temp-protocol-handler [server-keys]
  (let [^File temp-dir (temp/session-tempdir!)
        ^File db-dir (File. temp-dir "db")]
    (core/build-protocol-handler server-keys (.getAbsolutePath db-dir) 1)))

(defmacro def-configd-server-test [n b & body]
  `(def-logcfg-test ~n
     (let [server-keys# (cr/new-keys)]
       (tp/with-configd-server-and-client
         server-keys#
         (~build-temp-protocol-handler server-keys#)
         [~@b]
         ~@body)
       (temp/cleanup-all-tempdirs!))))

(defmacro def-server-test
  [n b & body]
  `(def-configd-server-test

Changes to test/streamful/test_protocol.clj.

15
16
17
18
19
20
21

22
23
24
25
26
27
28
; License along with streamful.
; If not, see <https://www.gnu.org/licenses/#AGPL>.
;

(ns streamful.test-protocol
  (:require [clojure.test :refer :all]
            [streamful.client :as client]

            [streamful.test-net :as tnet]
            [streamful.test-thread :as tthread]))

(defn start-server-and-wait [start-fn! port handler
                             {:keys [on-start on-exit timeout accept-timeout]
                              :or {on-exit (constantly :default)
                                   on-start (constantly :default)







>







15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
; License along with streamful.
; If not, see <https://www.gnu.org/licenses/#AGPL>.
;

(ns streamful.test-protocol
  (:require [clojure.test :refer :all]
            [streamful.client :as client]
            [streamful.crypto :as cr]
            [streamful.test-net :as tnet]
            [streamful.test-thread :as tthread]))

(defn start-server-and-wait [start-fn! port handler
                             {:keys [on-start on-exit timeout accept-timeout]
                              :or {on-exit (constantly :default)
                                   on-start (constantly :default)
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

64
65
66
            :timeout timeout
            :accept-timeout accept-timeout
            :on-exit on-exit
            :on-start on-start))]
    (tthread/wait-for "server start" server-running)
    stop-server))

(defmacro with-configd-server-and-client [handler b & body]
  `(let [port# (tnet/next-port!)

         stop-server!#
         (start-server-and-wait
           ~(second b)
           port#
           ~handler
           ~(nth b 2))]
     (let [~(first b)
           (partial client/new-client "localhost"
                    port#

                    1000)]
       ~@body)
     (stop-server!#)))







|











>



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
            :timeout timeout
            :accept-timeout accept-timeout
            :on-exit on-exit
            :on-start on-start))]
    (tthread/wait-for "server start" server-running)
    stop-server))

(defmacro with-configd-server-and-client [server-keys handler b & body]
  `(let [port# (tnet/next-port!)

         stop-server!#
         (start-server-and-wait
           ~(second b)
           port#
           ~handler
           ~(nth b 2))]
     (let [~(first b)
           (partial client/new-client "localhost"
                    port#
                    (-> ~server-keys :public-key cr/b64-str)
                    1000)]
       ~@body)
     (stop-server!#)))