Streamful

test_aggregates.clj at tip
Login

test_aggregates.clj at tip

File test/streamful/test_aggregates.clj from the latest check-in


;
; This file is part of streamful.
;
; streamful is free software: you can redistribute it and/or modify it
; under the terms ofthe GNU Affero General Public License as published
; by the Free Software Foundation, either version 3 of the License, or
; (at your option) any later version.
;
; streamful is distributed in the hope that it will be useful, but
; WITHOUT ANY WARRANTY; without even the implied warranty of
; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
; GNU Affero General Public License for more details.
;
; You should have received a copy of the GNU Affero General Public
; License along with streamful.
; If not, see <https://www.gnu.org/licenses/#AGPL>.
;

(ns streamful.test-aggregates
  (:require [clojure.test :refer :all]))

(comment
  ;; example data structure
  {:id "QNDCgdq2dtRSRX30EY5vpT01X9ERG50cWTTnaPvfaLg=",
   :k "hHvjt17crEGI/wr7eKHG7sxTIGECSu+6DXBIOJv+Bno=",
   :p
   {:t "Me, too!",
    :rid "v5RqeKuLF89O7rqr81GSLaojTOoTSonRusxkb8mSsro=",
    :tid "IufbrJC3j1PBUtDVpo2nJ9+aH+ukDtb0VekiC7V35f8="},
   :sid "test1"}
  )

(def thread-aggregate-selector-fn
  "(fn [msg] (->> msg :p :tid))")

(def thread-aggregator-fn
  "(fn [agg msg]
    (-> agg
        (update
          :thm
          (fn [thread-messages]
            (conj (or thread-messages []) (:id msg))))
        (update
          :thc
          (fn [thc] (inc (or thc 0))))))")

(def cleanup-thread-fn
  "(fn [agg] (dissoc agg :thm :thc))")

(def reply-aggregate-selector-fn
  "(fn [msg] (->> msg :p :rid))")

(def reply-aggregator-fn
  "(fn [agg msg]
    (-> agg
        (update
          :rm
          (fn [reply-messages]
            (conj (or reply-messages []) (:id msg))))
        (update
          :rc
          (fn [rc] (inc (or rc 0))))))")

(def cleanup-reply-fn
  "(fn [agg] (dissoc agg :rm :rc))")

(def count-only-thread-agg-fn
  "(fn [agg _]
    (update agg :thc (fn [thc] (inc (or thc 0)))))")

(def count-only-thread-cleanup-fn
  "(fn [agg] (dissoc agg :thc))")

(def count-only-reply-agg-fn
  "(fn [agg _]
    (update agg :rc (fn [rc] (inc (or rc 0)))))")

(def count-only-reply-cleanup-fn
  "(fn [agg] (dissoc agg :rc))")

(def stream-title-agg
  "(fn [agg {{:keys [mt t]} :p}]
    (if (= \"st\" mt)
      (assoc agg :title t)
      agg))")

(def stream-stats-agg
  "(fn [{:keys [msg-count max-len] :as agg} {{:keys [t]} :p}]
     (assoc agg :msg-count (inc (or msg-count 0))
                :max-len (max (or max-len 0) (count t))))")