Basic Processing
Commonly used processing steps to build data flows.
addjson
Parses a JSON text and adds the contents to the data tuple.
flow
=> code[json = '{"a": "foo", "b": 133}']
=> addjson['json'] // { json: "{\"a\": \"foo\", \"b\": 133}", a: "foo", b: 133 }
A single string parameter is expected to select the input field. Nested objects and arrays are parsed and returned as maps and lists respectively.
aggregate
Run a single aggregation step on the collection of input tuples.
flow
=> code[beer = `['amstel', 'brand', 'alfa', 'alfa', 'alfa']`]
=> split['beer']
=> aggregate['beer', count = 'count beer'] // [ { beer: "amstel", count: 1 }, { beer: "brand", count: 1 }, {beer: "alfa", count: 3 } ]
Aggregate takes a set of tuples as input and aggregate the results into 1 tuple. Additionally add new fields which use the functions count, sum or avg on any of the existing fields.
bucket
Convert a stream of tuples that arrive in a predefined time slot to a single collection of tuples.
concept Example {
val url = `document.location.href`
flow
=> bucket['10s']
=> aggregate['url', views = 'count url']
}
This will collect the url for each page opened and aggregate the data based on that value. After 10 seconds for each unique url a tuple will be generated which will also contain the views field. This field contains the number of times the url has occurred.
Convert a stream of tuples that arrive in a predefined time slot to a single collection of tuples. Guaranties the timely delivery of the output collection if at least a single tuple has arrived in the time slot.
buffer
Convert a stream of tuples that arrive in a predefined time slot to a single collection of tuples. The only guaranty is that of eventual delivery of the output collection.
concept Example {
val url = `document.location.href`
flow
=> buffer['10s']
=> aggregate['url', views = 'count url']
}
This will collect the url for each page opened and aggregate the data based on that value. After 10 seconds for each unique url a tuple will be generated which will also contain the views field. This field contains the number of times the url has occurred.
The buffer element takes a single unnamed parameter that defines the minimum time to buffer tuples. The buffer tries to send the data as soon as possible, but makes no guarantees. The data is send as a single list of tuples and stamped with the current time as packet time. This time can be retrieved using the time element..
Parameters
Name | Type | Description |
---|---|---|
time | String | Postfix time notation for collecting tuples. The postfix time notation uses ‘ms’ for milliseconds, ‘s’ for seconds, ‘m’ for minutes, ‘h’ for hours and ‘d’ for days. |
buffer:aggregate
Perform aggregation as a streaming step on input data that arrives in a specific time slot
Example
concept Example {
val url = `document.location.href`
flow
=> buffer:aggregate['10s', 'url', views = 'count url']
}
This will collect the url for each page opened and aggregate the data based on that value. After 10 seconds for each unique url a tuple will be generated which will also contain the views field. This field contains the number of times the url has occurred.
This element combines both the buffer and the aggregate flow elements into one element
Parameters
Name | Type | Description |
---|---|---|
time | String | String with postfix time notation for the time slow |
fields | list of fields | the fields for which the data is grouped |
The postfix time notation uses ‘ms’ for milliseconds, ‘s’ for seconds, ‘m’ for minutes, ‘h’ for hours and ‘d’ for days. References fields available in the input tuple or extension on top of them.
buffer:session
Collect input tuples until a user session ends.
concept Global {
match '*'
def guid = `dimml.sha1(+new Date()+Math.random().toString(36).slice(2)+navigator.userAgent).slice(20)`
val url = `location`
val sessionId = `sessionStorage.dimmlsid=sessionStorage.dimmlsid||guid()`
@groovy
flow
=> buffer:session['sessionId', timeout = '30s', `
session.startPage = session.startPage?:url
session.endPage = url
session.pageCount = (session.pageCount?:0)+1
false`]
=> console
}
This will collect the url for each page opened and the data based on the session ID. After 30 seconds for each unique url a tuple will be generated which will execute the code defined in the body. Since the startpage is stored only for the first tuple (and unchanged after that), it contains the first page of the visit. Similarly since the endPage parameter is updated for each new tuple, it will contain the URL of the last page of the visit. Finally the pagecount parameter is incremented with each page view. Since false is added to the body, no input tuples will be send to the next code element. Only 1 tuple will be send to the next code element, containing all session parameters. If for this DimML application first the URL http://documentation.dimml.io is opened followed by http://documentation.dimml.io/basic-syntax, the output tuple will be
{startPage=http://documentation.dimml.io, endPage=http://documentation.dimml.io/basic-syntax, pageCount=2}
This element buffers input tuples them using a session id. Apart from the session id, a time out can be defined after which the session has ended. The body of this code element can be used to perform additional calculation on the input tuples such as storing data in the session object. The body is executed at the moment the input tuple reach the code element. If the body returns false, the input tuple is discard. This is particularly convenient for calculting properties of a session both only storing them at the end of the session when the session expiration took place. All fields of the session object will be available as fields in the resulting flow
Parameters
Name | Type | Description |
---|---|---|
time | String | String with postfix time notation for the time slow |
sessionId | String | Value which is used to aggregate the data |
The postfix time notation uses ‘ms’ for milliseconds, ‘s’ for seconds, ‘m’ for minutes, ‘h’ for hours and ‘d’ for days. The session id refers.
call
Execute a specified flow on the input tuple.
concept Example {
match '*'
val a = `3.0`@groovy
val b = `4.0`@groovy
flow
=> call[`Functions:pythagoras`@dimml]
=> debug
plugin debug
}
concept Functions {
flow (pythagoras)
=> code[`Math.sqrt(a.power(2) + b.power(2))`@groovy]
}
This will result in the following tuples:
{b=4.0, a=3.0, c=5.0}
Send the input tuple to a target
flow and forward the resulting output. The target
flow can be defined in a separate concept, possibly in a separate file. This effectively sends the data to a flow in another concept, executes that concept, and returns the result.
Parameters
target (code) – Description of the concept and flow to be executed
The called target flow must be available, and thus be included somewhere if it is defined in another file. Calls to the target flow share the same execution concept. Specifically, this means that calls to the same target flow from different places can be combined. The call flow elements can be used to simulate function calls. This means the function flows can be included in arbitrary flows, so the content of the input tuple is uncertain and performing desired checks is recommended. For example, in the pythagoras flow you might want to verify that both a and b are present and numeric.
catch
Catch exceptions and redirect their handling to another part of the flow graph.
@groovy
concept Test {
match `once [dev]`@service
flow
=> catch (error)
=> code[`throw new IllegalStateException('My Mistake')`]
=> code[`[message: 'This is not executed']`]
=> console
flow (error)
=> code[`[message: 'An error has been caught']`]
=> console
}
code
Add values to the data tuple based on specified script code.
concept Example {
match '*'
val browser = 'Mozilla/5.0 (Windows NT) Firefox/36.0'
flow
=> code[
ie = `ua.contains('MSIE')`,
firefox = `ua.indexOf('Firefox')>=0`@groovy
]
}
This will result in the following tuple:
{ browser: Mozilla/5.0 (Windows NT) Firefox/36.0, ie: false, firefox: true }
Define new fields based on client side or server side code.
Parameters
List of assignments – Scripts to define a value for a field
Each assignment contains an expression which is assigned to a field. This script can either be in Javascript (default, executed client side) or Groovy (executed server side). For the later, add @groovy after the script text.
compact
Add a new field to the data tuple that contains a map of the current fields as value.
concept all {
match '*'
val firstName = 'John'
val lastName = 'Doe'
flow
=> compact['map']
=> console
}
This will result in the debug plugin showing the following text in the console: {lastName=Doe, firstName=John, map={lastName=Doe, firstName=John}}
Add a field called ‘compacted’ to the tuple that contains a (unmodifiable) map view of the tuple. The default field name can be overridden
Parameters
Field – Optional: field which the contains map