Streamful

Check-in [df4258a906]
Login

Check-in [df4258a906]

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Much spec simplification and extra testing
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: df4258a9069b9cb078091f0dcff039693ba5df24b173082e4ed880ba6db954af
User & Date: scstarkey 2025-03-21 13:12:52
Context
2025-03-21
13:24
More documentation check-in: a17e82f75e user: scstarkey tags: trunk
13:12
Much spec simplification and extra testing check-in: df4258a906 user: scstarkey tags: trunk
2025-03-20
12:38
Encode data structures before storing them Due to datalevin not properly decoding arbitrary structures which causes CPU and memory explosions when stringifying keys on the way back out check-in: 989d55f4f8 user: scstarkey tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to SPEC.md.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# streamful

WORK IN PROGRESS -- this specification is not finalized at all

streamful is a customizable, distributed social network platform. Its purpose is
to enable the darker parts of social networks (attention capitalization) to be
able to serve higher purposes by allowing people to rapidly experiment with new
expressions of their lives safely. It does this with a novel capacity for
controlling access to your data and visualizing it in the exact ways you want it
to show up.

## Stream

A stream contains arbitrary content that is authenticated as coming from whoever
controls the private side of a cryptographic key pair.

## Stream server

A stream server hosts streams. It provides various kinds of access control as to
what kind of content is allowed on the server. It also provides an access
control list of which keys are allowed to perform which operations on which
streams.

## Domain

A stream can be identified by a domain, which means that you don’t necessarily
need to know which server the content is on — you can use the domain to fetch
the content for that stream.

A stream may only be identified by one domain, and must make that choice at
inception. The domain must be registered in a public DNS server which contains a
TXT record that contains the public key of the stream’s owner.

A server that has chosen to host the stream will then use that public key to
verify ownership.

## Message

The only person that can publish a message to a stream is the owner of the 
stream's keys. When someone else fetches messages from a stream, they should 
verify that the message was signed by the stream owner. The first message in
every stream identifies the public key of the stream.




|

|
|











|
<
|
<
<
<
<
<
<
<
<
<
<
<
<
<







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

21













22
23
24
25
26
27
28
# streamful

WORK IN PROGRESS -- this specification is not finalized at all

streamful is a customizable social network platform. Its purpose is
to enable the darker parts of social networks (attention capitalization) to be
harnessed to serve higher purposes by allowing people to rapidly experiment with 
new expressions of their lives safely. It does this with a novel capacity for
controlling access to your data and visualizing it in the exact ways you want it
to show up.

## Stream

A stream contains arbitrary content that is authenticated as coming from whoever
controls the private side of a cryptographic key pair.

## Stream server

A stream server hosts streams. It provides various kinds of access control as to
what kind of content is allowed on the server. It also keeps track of which keys

own which streams.














## Message

The only person that can publish a message to a stream is the owner of the 
stream's keys. When someone else fetches messages from a stream, they should 
verify that the message was signed by the stream owner. The first message in
every stream identifies the public key of the stream.
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149

150
151
152
153
154
155
156
157
158
159
160
161
162
163
164


165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182


183











184
185





186

187


188


189

190



191

192


193








194

195




196




197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214

215
216
217
218
219
220
221
222
223
### Correction

A message may be corrected after the fact. This works just like a retraction
except new content is put in place of the old. The new message should show up in
the same position in the stream as the old one. The fact that there was a
correction should appear in the stream at the time of the correction.

### Replies

Anybody may submit a reply to a message on a stream that they own. If people
subscribe (see below) to the stream containing the message, and the stream 
containing the reply, then their server or client may choose to show replies in
context somehow. 

If a message that was replied to is ever retracted, then the reply will simply
show as a reply to a retracted message. Any cached content must be purged.

### Thread

Messages may be grouped into threads. The ID of the message at the beginning of
the thread must be used as the thread ID of every message in the thread.

If the message forming the root of a thread is ever retracted, then the messages
in the thread will show as belonging to a retracted thread. Any cached content
must be purged.

## Substream

A substream is a stream contained within another stream. A substream may be
identified by a domain or not.

A substream can be moved between streams, but it may only have one primary
stream. The process to change streams involves notifying the previous primary
stream of the new stream. If the new stream is on a different server, the new
server may fetch all content from the previous server before the transfer is
complete. The previous stream will refuse all future messages submitted to that
substream. There is no obligation for a server to retain messages of a substream
that has moved to another server. However, servers must retain the fact that a
particular substream has moved and where it has moved to in perpetuity.

### Substream moderation

A substream may be created within streams that have specifically been configured
by their owner to allow substreams. Those substreams may also be required to be
approved. If your substream is denied you may or may not be allowed to submit
again with or without changes to the creation request.

## Stream ID

A stream ID tells you how to access the messages on a stream. The format is 
`id@domain`. The `id` portion is for streams that are not identified by domains,
but are used to access substreams of the given domain.

If a substream is contained within another substream, add another `id@` to the
left. So, `cats@pets@animals.com` means there is a stream called animals.com 
which contains a substream `pets@animals.com` which contains a substream
`cats@pets@animals.com`

## Subscription

A stream may subscribe to another stream. If the messages for the destination
stream are on another server, the subscriber’s server must maintain a mirror
copy of the given stream so long as there are active subscriptions.

The subscription is recorded as a special subscription-type message in the
subscriber stream.

Subscription requests may only be submitted if signed by an authorized key. They
may only be retrieved by those who sign their fetch requests with an authorized
key.

When a stream with subscriptions moves to another server, the old server stops
fetching messages.

### Signed subscription

A signed subscription allows the destination stream "subscribe back," which
gives the destination the ability to read replies from the subscriber. What 
happens is:

1. The subscriber's stream publishes a signed message requesting subscription
   to the destination stream
2. The subscriber's server holds onto that message amongst all its other
   subscriptions for the destination stream
3. When the subscriber's server fetches new messages from the destination
   stream, it includes a reference to all subscription messages for the
   destination server to check
4. When the destination server receives a new subscriber for the first
   time it notifies the destination stream of the subscriber stream, at
   which point it becomes possible for the destination stream to subscribe
   back if it wants to

Subscribers are identified by their public key and their stream ID, since
the stream ID may change. If the same public key is used for multiple

subscriptions from different stream IDs, the destination stream may choose
how to handle the collision. It is possible one of the streams may have moved
or the stream's keys have been compromised.

### Subscription updates

Servers may configure how often other servers may fetch subscription updates.
They may also choose to offer some servers preferential treatment in whatever
way they wish.

### Push style

Servers may also choose to offer “push” style updates, with a few caveats:

1. Messages destined for multiple subscribers on a server may only be delivered


   once per server
2. Messages may be published in batch sizes configurable by the push recipient
   within parameters that the publisher allows
3. A server may not fetch messages if push is enabled

## Private messages

Messages sent to a single subscriber have no right to privacy. If the
subscriber decides to make those types of messages available on their stream
they may do so. However, they must make that choice on a message-by-message
basis.

Likewise, servers have no obligation to store messages encrypted at rest.
This means that, while a server will only deliver private messages to the
subscriber they are destined for, anybody with access to that server's database
can see the messages. 

The only real protection is end-to-end encryption. 














## Stream moderation






A stream may set moderation rules that apply to itself and all its substreams.

These rules include which streams may be subscribed to or not. It may use a


combination of allow and deny lists, which may specify any level of granularity.


For example, a denial of `pets@animals.com` also denies `cats@pets@animals.com` 

but not `wildlife@animals.com`, whereas a denial of `animals.com` blocks all 



content from that stream and all its substreams, recursively.




Moderation rules may be set only by those who have keys authorized to change








them. They may only be viewed by those who have keys authorized to view them.






A server may also set moderation rules which apply to all streams it hosts.





## Replies and threads across streams

If a message in one stream replies to a message in another stream, it must
subscribe to the other stream. This way a full picture of the thread may be
maintained.

If the stream of the original thread also subscribes back, it can make all the
messages your stream submits in reply to its messages available to its other
subscribers.

There is one caveat: if a stream requests messages from another stream, it may
only receive that stream's messages.

## Reading a stream

When fetching stream messages, you only get the messages of that stream. If your
server is configured to do so, it may "auto-subscribe" to messages from other

streams mentioned in said messages. Those rules can be configured 
programmatically by the server to save on storage costs. 

## Stream access

A stream’s discoverability may be controlled by the stream’s owner, or an
authorized representative:

1. Can only be viewed by the owner (private, the default)







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<


|
<
<
|
<
<
<
<



<
<
<
|
<
<
|
<
<
<
|
<
<



|
|
<
|
<
<
<
<
<
<
<
<
<
<
<


|
>
|
<
|

<
|
<
<
<
|
<
|
<

|
>
>
|
<
<
<













|
>
>

>
>
>
>
>
>
>
>
>
>
>
|

>
>
>
>
>
|
>
|
>
>
|
>
>
|
>
|
>
>
>
|
>

>
>
|
>
>
>
>
>
>
>
>
|
>

>
>
>
>
|
>
>
>
>
















|
|
>
|
<







42
43
44
45
46
47
48








































49
50
51


52




53
54
55



56


57



58


59
60
61
62
63

64











65
66
67
68
69

70
71

72



73

74

75
76
77
78
79



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174

175
176
177
178
179
180
181
### Correction

A message may be corrected after the fact. This works just like a retraction
except new content is put in place of the old. The new message should show up in
the same position in the stream as the old one. The fact that there was a
correction should appear in the stream at the time of the correction.









































## Stream ID

A stream ID tells you how to access the messages on a stream. Each stream ID


is unique to a server.





## Subscription




A subscription is simply the intention to read messages from a stream on a


periodic basis. Subscriptions are maintained on the client side, and as such



there is no way to know who subscribes to your stream unless they tell you.



### Signed subscription

A signed subscription allows the destination stream to know that you subscribe
to their stream. That's really the only way they can "subscribe back," which

gives them the ability to read replies from you as they happen.












Subscribers are identified by their public key and their stream ID, since
the stream ID may change. Whenever they fetch messages from a stream they
subscribe to, they can choose to sign the fetch request with their public key,
and include what stream ID is requesting the data. The server can then

notify the stream owner that they have a new subscriber.


It's important to sign every request. That way the server can let the stream



owner know how active your subscription is. If you fetched messages once and 

then never again, that's a signal that the interest isn't really all that high.


If the same public key is used for multiple subscriptions from different stream 
IDs, the destination stream may choose how to handle the collision. It is 
possible one of the streams may have moved or the stream's keys have been 
compromised.




## Private messages

Messages sent to a single subscriber have no right to privacy. If the
subscriber decides to make those types of messages available on their stream
they may do so. However, they must make that choice on a message-by-message
basis.

Likewise, servers have no obligation to store messages encrypted at rest.
This means that, while a server will only deliver private messages to the
subscriber they are destined for, anybody with access to that server's database
can see the messages. 

The only real protection is end-to-end encryption. If this ever ends up as
part of this spec, it will likely involve 
[Messaging Layer Security](https://datatracker.ietf.org/doc/rfc9420/)

## Server side aggregates

In order to minimize duplication of effort in generating aggregate data, servers
can provide aggregates for specific event types. For example, a reaction event
type could produce an aggregate across a specific message ID. The server would
then feed back only the most recent aggregate and not return the individual
reaction messages that led to it.

Aggregates can be configured at the message or stream level. A message aggregate
means, "I want to know if any message on the server mentions this message."
A stream aggregate means, "I want to aggregate everything happening in this 
stream."

Message aggregate data is fetched when the message itself is fetched. This means
that, whenever a client fetches a message that mentions a message in a stream
it manages, it can notify you that there are updates and re-fetch that message.
It need only do that once per fetch, regardless of how many times the message
was mentioned.

This means that it's good practice to notify 

Stream aggregate data is fetched alongside the results of a request for a 
list of streams.

Since the server would need to read the content of these messages, any
messages with end-to-end encryption would not be readable by any aggregate.

## Stream customization

Any stream can customize their stream using arbitrary code that can be uploaded
without needing to be compiled on a computer. In this way, stream managers could
even use their phones to program their own stream experience.

### Customizable parts

Users can upload scripts to their server that customize various things about
their stream:

1. “Page” layout — a stream has a home page and sub-pages. A page experience is
   one of structure, not style. It shows how data are organized. Pages can
   choose to display data from the stream, and any other streams, however they
   want. Further, apps that render pages have a lot of freedom as to how
   a page is rendered
2. Widgets — have access to raw messages and aggregates and can display them in
   interesting ways, using charts and a basic layout. A page is
   a container for widgets

### Security

Stream scripts do not have access make arbitrary network connections or perform
other kinds of IO. They can only perform transformations based
on data that is passed to them, namely the previous value of the aggregate
and the message that is being inspected.

## Stream moderation

A stream may set moderation rules. These rules include which streams' messages 
may be included in its aggregates.

## Replies and threads across streams

If a message in one stream replies to a message in another stream, it must
subscribe to the other stream. This way a full picture of the thread may be
maintained.

If the stream of the original thread also subscribes back, it can make all the
messages your stream submits in reply to its messages available to its other
subscribers.

There is one caveat: if a stream requests messages from another stream, it may
only receive that stream's messages.

## Reading a stream

When fetching stream messages, you only get the messages of that stream. You
can choose to fetch all messages, or only messages that match a specific rule.
For example, you may only want to see messages that mention messages in
a specific stream of yours. Those rules are sent as part of the request.


## Stream access

A stream’s discoverability may be controlled by the stream’s owner, or an
authorized representative:

1. Can only be viewed by the owner (private, the default)
231
232
233
234
235
236
237
238
239
240
241

242


243
244
245


246
247
248


249
250

251

252
253


254

255

256
257
258
259
260

261
262


263
264
265
266
267
268
269
270
271
272
273


274

275


276

277


278

279
280

281
282
283
284

Every message published to a stream must have a type. If it doesn’t have a type
it is assumed to be a plain text note.

Servers must accept all message types, regardless of if it “knows” how to
process it. Messages of unknown types must be stored as-is. Clients may choose
to perform their own processing of these message types.

## Server side aggregates

In order to minimize duplication of effort in generating aggregate data, servers
can provide aggregates for specific event types. For example, a reaction event

type could produce an aggregate across a specific message ID. The server would


then feed back only the most recent aggregate and not return the individual
reaction messages that led to it, unless requested (for audit purposes).



Aggregates can be configured at any level. An aggregate at the root of the 
server is invoked once per message fetched across all subscriptions. One 
attached to a specific stream is only invoked for its messages and those of


its substreams, unless otherwise configured.


Since the server would need to read the content of these messages, any 

messages with end-to-end encryption would not be submitted to any aggregate.



## Stream customization



Any stream can customize their stream using arbitrary code that can be uploaded
without needing to be compiled on a computer. In this way, stream managers could
even use their phones to program their own stream experience.

### Customizable parts


Users can upload scripts to their server that customize various things about


their stream:

1. “Page” layout — a stream has a home page and sub-pages. A page experience is
   one of structure, not style. It shows how data are organized. Pages can
   choose to display data from the stream, and any substreams, however they 
   want. Further, apps that render pages have a lot of freedom as to how
   a page is rendered
2. Aggregators — as messages come in, a script can be tied to a specific message
   type, updating server side aggregates
3. Widgets  have access to raw messages and aggregates and can display them in
   interesting ways, using charts and a basic layout inspired by html. A page is


   a container for widgets




### Security




Stream scripts do not have access make arbitrary network connections or perform

other kinds of IO. They can only perform pure functional transformations based
on data that is passed to them, as well as access data within their scope.


For example, an aggregate on the root stream of a server has access to all
other aggregates on the server. An aggregate applied only to a substream has 
access only to that substream's aggregates.








|

|
|
>
|
>
>
|
<

>
>
|
|
<
>
>
|

>
|
>
|

>
>
|
>

>
|
<
<

<
>

<
>
>
|

<
<
|
|
|
|
|
|
|
>
>
|
>

>
>
|
>

>
>
|
>
|
<
>

<
<
<
>
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204

205
206
207
208
209

210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225


226

227
228

229
230
231
232


233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254

255
256



257
Every message published to a stream must have a type. If it doesn’t have a type
it is assumed to be a plain text note.

Servers must accept all message types, regardless of if it “knows” how to
process it. Messages of unknown types must be stored as-is. Clients may choose
to perform their own processing of these message types.

# Protocol

The protocol is currently a work-in-progress. However, at a high level, what
seems to be stable is that messages are CBOR-encoded and signed. They
have a high-level structure that includes mandatory keys:

`ct`
The time, in milliseconds since the epoch, this message was
created on the client


`c`
The command you are issuing to the server. `put` means just put this message
in the stream, for example


`k`
The base64-encoded representation of the public signing key that owns the
stream this message is published on

`m`
An arbitrary map of data that contains the contents of the message.
the only key that is mandatory in the map is `t`, a text representation of
the message

Other conventions are likely, so that clients can all share a single method
for filtering messages. For example, `rid` as the ID of a message I'm
replying to, `rs` as the name of the stream the message I'm replying to resides
in, etc.

`sid`
The name of the stream this message is being published to.




## CBOR encoding specifics


Messages are CBOR encoded, and the binary contents of those encoded messages
are signed. The ID of the message is the base64-encoded SHA-128 hash of
those contents.



The specifications included are:

[RFC 8949](https://www.rfc-editor.org/rfc/rfc8949.html)

  * The basic format, including all major types
  * Number extensions: tags 2, 3, 4, and 30
  * Time extensions: tags 0 and 1
    * [RFC 3339 - Timestamps](https://www.rfc-editor.org/rfc/rfc3339.html)
    * [RFC 4287 - Atom Syndication](https://www.rfc-editor.org/rfc/rfc4287.html) 
      section 3.3
  * URLs: tag 32

[RFC 7049 - Original CBOR Proposal](https://www.rfc-editor.org/rfc/rfc7049.html#section-2.4.4.3)
  * Regular expressions: tag 35, section 2.4.4.3

[RFC 8943](https://www.rfc-editor.org/rfc/rfc8943.html)

  * Local date and time extensions (tags 100 and 1004)
     * [RFC 3339 - Timestamps](https://www.rfc-editor.org/rfc/rfc3339.html)

[RFC 9562](https://www.rfc-editor.org/rfc/rfc9562.html)


  * UUIDs: tag 37




Messages encoded with any other tags may be rejected by servers altogether.

Changes to src/streamful/stream_datalevin.clj.

35
36
37
38
39
40
41



42
43
44
45
46
47
48

(defn- stream-agg-k [sid] (keyword (format "stream-agg-%s" sid)))

(defn- put-data
  ([table kt k vt v] [:put table k v kt vt])
  ([table k v] (put-data table :data k :data v)))




(defn- put-encoded
  ([table kt k v] (put-data table kt k :bytes (cbor/encode v)))
  ([table k v] (put-encoded table :data k v)))

(defn- decode-pair [[k v]] [k (cbor/decode v)])

(defn- get-decoded







>
>
>







35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

(defn- stream-agg-k [sid] (keyword (format "stream-agg-%s" sid)))

(defn- put-data
  ([table kt k vt v] [:put table k v kt vt])
  ([table k v] (put-data table :data k :data v)))

;; Note the CBOR encoding here uses a more expansive default codec than the
;; one in the transport namespace, including all clojure data types.

(defn- put-encoded
  ([table kt k v] (put-data table kt k :bytes (cbor/encode v)))
  ([table k v] (put-encoded table :data k v)))

(defn- decode-pair [[k v]] [k (cbor/decode v)])

(defn- get-decoded
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
                  {"id" id
                   "psk" client-psk
                   "srt" t
                   "m" msg
                   "o" original-msg-bytes})
     (mapping-put-msg id k)]))

;; todo allow substreams of substreams
;; for now all substreams are implicitly of the root stream

(defn is-owner? [tx client-psk]
  (let [existing-owner (get-db-owner tx)
        result (creq/eq? existing-owner client-psk)]
    {:is-owner?
     {:given client-psk
      :actual existing-owner
      :result result}}
    (and existing-owner result)))

(defn claim-server! [db client-psk]
  (db/with-transaction-kv [tx db]
    (let [existing-owner (get-db-owner tx)]
      (if existing-owner
        :already-claimed







<
<
<



<
<
<
<







84
85
86
87
88
89
90



91
92
93




94
95
96
97
98
99
100
                  {"id" id
                   "psk" client-psk
                   "srt" t
                   "m" msg
                   "o" original-msg-bytes})
     (mapping-put-msg id k)]))




(defn is-owner? [tx client-psk]
  (let [existing-owner (get-db-owner tx)
        result (creq/eq? existing-owner client-psk)]




    (and existing-owner result)))

(defn claim-server! [db client-psk]
  (db/with-transaction-kv [tx db]
    (let [existing-owner (get-db-owner tx)]
      (if existing-owner
        :already-claimed

Changes to src/streamful/transport.clj.

14
15
16
17
18
19
20
21
22
23
24
25
26



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
; You should have received a copy of the GNU Affero General Public
; License along with streamful.
; If not, see <https://www.gnu.org/licenses/#AGPL>.
;

(ns streamful.transport
  (:require [clj-cbor.core :as cbor]
            [clj-cbor.tags.content :as tags.content]
            [clj-cbor.tags.numbers :as tags.num]
            [clj-cbor.tags.text :as tags.text]
            [clj-cbor.tags.time :as tags.time]
            [clojure.walk :as walk]))




(def ^:private write-handlers
  (merge tags.content/content-write-handlers
         tags.num/number-write-handlers
         tags.time/epoch-time-write-handlers
         tags.time/epoch-date-write-handlers
         tags.text/text-write-handlers))

(def ^:private read-handlers
  (merge tags.content/content-read-handlers
         tags.num/number-read-handlers
         tags.time/instant-read-handlers
         tags.time/local-date-read-handlers
         tags.text/text-read-handlers))

(def ^:private cbor-codec
  (cbor/cbor-codec
    :write-handlers write-handlers
    :read-handlers read-handlers))

(defn encode-msg [msg]
  (cbor/encode (walk/stringify-keys msg)))

(defn decode-msg [msg]
  (cbor/decode cbor-codec msg))







<





>
>
>

<
|





<
|










|



14
15
16
17
18
19
20

21
22
23
24
25
26
27
28
29

30
31
32
33
34
35

36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
; You should have received a copy of the GNU Affero General Public
; License along with streamful.
; If not, see <https://www.gnu.org/licenses/#AGPL>.
;

(ns streamful.transport
  (:require [clj-cbor.core :as cbor]

            [clj-cbor.tags.numbers :as tags.num]
            [clj-cbor.tags.text :as tags.text]
            [clj-cbor.tags.time :as tags.time]
            [clojure.walk :as walk]))

;; This namespace provides transport-level encoding and decoding, affording
;; better portability between platforms. See <SPEC.md>

(def ^:private write-handlers

  (merge tags.num/number-write-handlers
         tags.time/epoch-time-write-handlers
         tags.time/epoch-date-write-handlers
         tags.text/text-write-handlers))

(def ^:private read-handlers

  (merge tags.num/number-read-handlers
         tags.time/instant-read-handlers
         tags.time/local-date-read-handlers
         tags.text/text-read-handlers))

(def ^:private cbor-codec
  (cbor/cbor-codec
    :write-handlers write-handlers
    :read-handlers read-handlers))

(defn encode-msg [msg]
  (cbor/encode cbor-codec (walk/stringify-keys msg)))

(defn decode-msg [msg]
  (cbor/decode cbor-codec msg))

Changes to test/streamful/protocol_test.clj.

19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
(ns streamful.protocol-test
  (:require [clj-cbor.core :as cbor]
            [clojure.test :refer :all]
            [crypto.equality :as creq]
            [streamful.client :as client]
            [streamful.crypto :as cr]
            [streamful.protocol :refer :all]
            [streamful.test-aggregates :as tagg]
            [streamful.test-asserts :as ta]
            [streamful.test-cfg :as tcfg]
            [streamful.test-crypto :as tcrypto]
            [streamful.test-protocol :as tp]
            [streamful.test-protocol :refer :all]
            [streamful.transport :as transport])
  (:import (java.io ByteArrayInputStream)







<







19
20
21
22
23
24
25

26
27
28
29
30
31
32
(ns streamful.protocol-test
  (:require [clj-cbor.core :as cbor]
            [clojure.test :refer :all]
            [crypto.equality :as creq]
            [streamful.client :as client]
            [streamful.crypto :as cr]
            [streamful.protocol :refer :all]

            [streamful.test-asserts :as ta]
            [streamful.test-cfg :as tcfg]
            [streamful.test-crypto :as tcrypto]
            [streamful.test-protocol :as tp]
            [streamful.test-protocol :refer :all]
            [streamful.transport :as transport])
  (:import (java.io ByteArrayInputStream)
179
180
181
182
183
184
185
186








187
188
189
190
191
192
193
                (is (= expected-test1-messages
                       (map tp/get-m test1-messages)) r2))

              (testing "we can validate and reassemble from original binaries"
                (is (= expected-root-messages
                       (map reassemble-original root-messages)))
                (is (= expected-test1-messages
                       (map reassemble-original test1-messages))))))









          (testing "good error message when stream is missing"
            (is (= {:response {"status" "stream 'missing-stream' not found"}}
                   (get-as client keys1 "missing-stream"))))

          (testing "streams are private by default, show as missing"
            (is (= {:response {"status" "stream 'root' not found"}}







|
>
>
>
>
>
>
>
>







178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
                (is (= expected-test1-messages
                       (map tp/get-m test1-messages)) r2))

              (testing "we can validate and reassemble from original binaries"
                (is (= expected-root-messages
                       (map reassemble-original root-messages)))
                (is (= expected-test1-messages
                       (map reassemble-original test1-messages))))

              (ta/pending "can filter based on simple rules"
                ;; for example, only "cfg" messages, or only messages
                ;; whose params contain specific key/value pairs
                ;; i.e. ["m" "params" "rs" ["stream1" "stream2"]]
                ;;   would mean give me all messages that are in reply to
                ;;   messages in any of these streams
                )))

          (testing "good error message when stream is missing"
            (is (= {:response {"status" "stream 'missing-stream' not found"}}
                   (get-as client keys1 "missing-stream"))))

          (testing "streams are private by default, show as missing"
            (is (= {:response {"status" "stream 'root' not found"}}
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
         "m"
         {
          ;; when created on the client, ms since epoch
          ;; this would normally be a bigger number, but the above test has
          ;; time hard-coded for consistency of test assertions
          "ct" 22,

          ;; command -- what should subscribers do with this message?
          ;; `put` just means "put it in your subscription"
          "c" "put",

          ;; base64-encoded form of `:psk` above for verification of internal
          ;; consistency to prevent forgery
          "k" "fCJ8C+MBKGuwLPz771W4IhuAxYStrwnFHKkpRGquLV8=",

          ;; message itself







|
|







509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
         "m"
         {
          ;; when created on the client, ms since epoch
          ;; this would normally be a bigger number, but the above test has
          ;; time hard-coded for consistency of test assertions
          "ct" 22,

          ;; command -- what should readers do with this message?
          ;; `put` just means "put it in your list"
          "c" "put",

          ;; base64-encoded form of `:psk` above for verification of internal
          ;; consistency to prevent forgery
          "k" "fCJ8C+MBKGuwLPz771W4IhuAxYStrwnFHKkpRGquLV8=",

          ;; message itself
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601

        ;; performs all the validations and disassembles
        reassemble-original

        ;; should be identical to the outer `"m"` above
        clojure.pprint/pprint))
  )

(deftest server-side-subscriptions-test
  (testing "subscribe"
    (testing "same server"
      (ta/pending "updates destination properly"))
    (testing "different server"
      (ta/pending "notifies remote only once")
      (ta/pending "notifies remote again if remote fails")))
  (testing "fetch subscriptions"
    (testing "per stream"
      (ta/pending "all messages")
      (ta/pending "individual deny list")
      (ta/pending "by rules")
      (ta/pending "only once even with multiple subscribers"))
    (testing "server defaults"
      (ta/pending "allow list")
      (ta/pending "deny list")))
  (testing "unsubscribe"
    (testing "same server"
      (ta/pending "updates destination properly"))
    (testing "different server"
      (ta/pending "notifies remote only once")
      (ta/pending "notifies remote again if remote fails")
      (ta/pending "purges remote on last"))))







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
578
579
580
581
582
583
584

























        ;; performs all the validations and disassembles
        reassemble-original

        ;; should be identical to the outer `"m"` above
        clojure.pprint/pprint))
  )
























Added test/streamful/transport_test.clj.









































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
;
; This file is part of streamful.
;
; streamful is free software: you can redistribute it and/or modify it
; under the terms ofthe GNU Affero General Public License as published
; by the Free Software Foundation, either version 3 of the License, or
; (at your option) any later version.
;
; streamful is distributed in the hope that it will be useful, but
; WITHOUT ANY WARRANTY; without even the implied warranty of
; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
; GNU Affero General Public License for more details.
;
; You should have received a copy of the GNU Affero General Public
; License along with streamful.
; If not, see <https://www.gnu.org/licenses/#AGPL>.
;

(ns streamful.transport-test
  (:require [clj-cbor.core :as cbor]
            [clojure.string :as string]
            [clojure.test :refer :all]

            [streamful.test-asserts :as ta]
            [streamful.transport :refer :all])
  (:import (clj_cbor.data.tagged TaggedValue)
           (clojure.lang Keyword)))

(defmacro is-encoded
  ([expected v]
   `(is (= ~expected (-> ~v encode-msg decode-msg))))
  ([v] `(is-encoded ~v ~v)))

(deftest encoding-decoding-test
  (testing "encodes and decodes basic structures"
    (is-encoded [])
    (is-encoded {})
    (is-encoded 1)
    (is-encoded "a")
    (is-encoded {"a" [1 1.5 "test"]})
    (is-encoded {"a" "b"} {:a "b"})
    (is-encoded {"a" {"b" ["d" 5 6.5], "f" "g"}}
                {:a {:b ["d" 5 6.5] "f" "g"}}))

  (testing "doesn't encode platform-specific values"
    (ta/is-thrown [e]
      (encode-msg [:a])
      (= {:msg "No known encoding for object: :a",
          :data {:value :a
                 :class Keyword
                 :dispatch Keyword
                 :cbor/error :clj-cbor.codec/unsupported-type}}
         e))

    (ta/is-thrown [e]
      (encode-msg (Thread.))
      (= {:class Thread
          :dispatch Thread
          :cbor/error :clj-cbor.codec/unsupported-type
          :m "No known encoding for object"}
         (let [{:keys [data msg]} e]
           (-> data
               (dissoc :value)
               (assoc :m (-> msg (string/split #":") first)))))))

  (testing "doesn't decode platform-specific encodings"
    (is (= ["a" {(TaggedValue. 39 ":b" nil) (TaggedValue. 39 ":c" nil)}]
           (decode-msg (cbor/encode ["a" {:b :c}]))))))