Aggregates Overview
One of Sumatra's core capabilities is computing trailing-window aggregates, grouped by arbitrary entity keys.
Say you want to compute the following:
Given an IP address, find the number of unique email domains that have successfully logged into that IP in the last 10 minutes.
You would express that feature in Scowl as:
CountUnique<login>(
email_domain
by ip
where successful
last 10 minutes
)
Conceptually, an aggregate feature computes a complex SQL-like query over arbitrary past event data. In reality, Sumatra transforms this simple syntax into a combination of write-time and read-time computations that achieve low-latency serving at scale.
Syntax
The general syntax for an aggregate feature is:
Function[<scope, ...>]([value, ...]
[by key, ...]
[where condition]
[after time]
[last duration]
[limit number]
[exclusive])
Not every aggregate function supports every query element, but when supported, elements always appear in this order.
Function
Scowl includes an extensive collection of aggregate functions, including:
- Exact, e.g.
Count
,Sum
- Approximate, e.g.
DecayedCount
,DecayedSum
- Temporal, e.g.
First
,Last
- Collection, e.g.
Set
,Sequence
The chosen aggregate function dictates the reduce operation, the applicable query elements, and the feature return type.
Scope
Like all features, aggregate features are declared in the context of a particular event, meaning its value is read every time that event is seen.
Aggregates also have a scope, which specifies the event (or events) that write the values that comprise the aggregate.
Common
In common aggregates, the write scope of the aggregate is the same as its read context.
Example:
-- common aggregate: Function(query)
event purchase
spend_30d := Sum(total_price by account last 30 days)
The spend_30d
aggregate is computed during the purchase
event and, because
no other event scope is specified in the feature definition,
the raw data comprising the aggregate comes from
the purchase
event as well.
Cross-Event
Often, we want to aggregate values over one event type then fetch the result during a different event. To do so, we can add a write scope to the feature definition.
Example:
-- cross-event aggregate: Function<scope>(query)
event purchase
address_changes_30d :=
CountUnique<billing_update>(address by account last 30 days)
By specifying <billing_update>
as the scope, the aggregate comprises values
over billing_update
events, while the final value is fetched during the
purchase
event.
A very common use case for cross-event aggregates is to use Latest
aggregates
to perform a realtime join with other event types on a common key, e.g.
-- realtime join between click and page view events
event click
experiment_id := Latest<page>(experiment_id by session_id)
It is possible to specify multiple scopes in an aggregate, which results in a union of the data across all scopes, e.g.
-- with multiple scopes, compute the aggregate over the *union* of events
activities_24h := Count<post,comment,like>(by user_id last day)
If the specified scope is the same as the feature context, it is equivalent to a common aggregate, e.g.
-- the two features are equivalent
event page
page_views1 := Count(by session_id)
page_views2 := Count<page>(by session_id)
Value
The aggregate's value appears at the start of the aggregate query, specifying
the feature (or features) to be aggregated. In rare cases, like Count
and LatestTime
, the value may be ommitted.
Average(session_time by user_id) -- single value
Latest(path, search by session_id) -- multiple values (comma-separated)
Count(by account_id) -- no value (value=1 is used)
When multiple values are supported (in aggregates like Latest
and First
),
the return type is a struct.
Note that the feature(s) declared as the value must exist in the event specified as the aggregate's scope.
By key
The aggregate's key appears after the by
keyword and determines how
events are grouped prior to aggregation. Also, the key determines
the index used to "look up" the value at read-time. A compound key may be assembled
by listing multiple comma-separated keys.
Typically, the read and write key are the same, but sometimes it is desirable to use different keys at write-time and read-time. This flexibility is particularly helpful in cross-event aggregates where the foreign key may be named differently.
To support this, we use an as
clause.
Syntax:
by read_key1 as write_key1, read_key2 as write_key2, ...
Examples
event transfer
Count(by sender) -- simple key
Count(by sender, payment_type) -- compound key
-- join deposit.depositor (write) on transfer.sender (read)
Count<deposit>(by sender as depositor)
-- use compound foreign keys to count prior reverse transfers from receiver to sender
Count(by sender as receiver, receiver as sender)
Warning
It is possible to omit the by
key, which aggregates all
events, e.g. Average(price last week)
. However, this should
be done with caution as it often results in a "hot key" that can
increase feature serving latency.
Where
The where
clause filters the events that will be included in the aggregation.
It must be a boolean expression. The condition is always applied at write time,
so its constituent features always belong to that scope.
Examples:
-- a simple boolean feature
Count(by user where successful)
-- complex boolean expression
Count(by user where product='toaster' and Length(sku) > 12)
-- the `domain` feature is from the `login` scope
Count<login>(by ip where domain is not null)
After
The after
clause filters events included in the aggregation
to include only those with a timestamp later than the specified
time. The argument must be the name of a feature with type time.
Examples:
-- valid after clause is a feature name
Count(by user after last_cart_add_time)
-- ERROR: cannot be an arbitrary expression
Count(by user after EventTime() - 4 days)
The filter is always applied at read time, so it does not impact the
memory requirements of the aggregate. To efficiently maintain shorter time windows, you
should use last
, which may be combined with after
.
A common use case for after
is to zero out an aggregate when some
event occurs, e.g.
-- how many clicks since the latest search?
latest_search := LatestTime<search>(by session_id last 20 minutes)
clicks_since_search :=
Count<click>(by session_id after latest_search last 20 minutes)
Note
The use of after
is relatively rare. For most use cases,
last
achieves the intended result. The two may be used together.
Last
The last
clause restricts the duration of time-windowed aggregation.
The window must be knowable at compile-time, so the argument must be a constant.
-- valid duration constants
Count(by user last week)
Count(by user last 4 hours)
Count(by user last PT5H30M) -- iso8601
-- ERROR: cannot be feature or expression
Count(by user last my_duration)
Count(by user last 5 hours + 30 minutes)
When the last
duration is ommitted, the system default (typically 30 days) is used. It is possible to extend this duration with an explicit
window, e.g. Count(by user last 180 days)
, but this will
increase memory requirements.
Limit
The limit
clause is similar to last
but restricts the window
to a number of events instead of a time duration. The argument must be an
integer constant.
-- valid limits
Count(by user limit 5)
Count(by user last week limit 10)
-- ERROR: cannot be feature or expression
Count(by user limit my_window_size)
Count(by user limit 5 * 5)
When last
and limit
are used together, both are enforced,
so the resulting window is always the smaller of the two.
Exclusive
Common aggregates are inclusive by default, meaning that they include the value from the current (read-time) event in the result. In cross-event aggregates, the read-time and write-time events are different, so the concept of inclusivity does not apply.
When you do not want this default behavior in common aggregates, you can add
the exclusive
keyword to exclude the current event from the calculation.
-- inclusive starts at 1
Count(by user)
-- exclusive starts at 0
Count(by user exclusive)
-- ERROR: not applicable to cross-event
Count<login>(by ip exclusive)