Skip to main content

Basic Processing

Commonly used processing steps to build data flows.

addjson

Parses a JSON text and adds the contents to the data tuple.

Example
flow
=> code[json = '{"a": "foo", "b": 133}']
=> addjson['json'] // { json: "{\"a\": \"foo\", \"b\": 133}", a: "foo", b: 133 }

A single string parameter is expected to select the input field. Nested objects and arrays are parsed and returned as maps and lists respectively.

aggregate

Run a single aggregation step on the collection of input tuples.

Example
flow
=> code[beer = `['amstel', 'brand', 'alfa', 'alfa', 'alfa']`]
=> split['beer']
=> aggregate['beer', count = 'count beer'] // [ { beer: "amstel", count: 1 }, { beer: "brand", count: 1 }, {beer: "alfa", count: 3 } ]

Aggregate takes a set of tuples as input and aggregate the results into 1 tuple. Additionally add new fields which use the functions count, sum or avg on any of the existing fields.

bucket

Convert a stream of tuples that arrive in a predefined time slot to a single collection of tuples.

Example
concept Example {
val url = `document.location.href`

flow
=> bucket['10s']
=> aggregate['url', views = 'count url']
}

This will collect the url for each page opened and aggregate the data based on that value. After 10 seconds for each unique url a tuple will be generated which will also contain the views field. This field contains the number of times the url has occurred.

Convert a stream of tuples that arrive in a predefined time slot to a single collection of tuples. Guaranties the timely delivery of the output collection if at least a single tuple has arrived in the time slot.

buffer

Convert a stream of tuples that arrive in a predefined time slot to a single collection of tuples. The only guaranty is that of eventual delivery of the output collection.

Example
concept Example {
val url = `document.location.href`

flow
=> buffer['10s']
=> aggregate['url', views = 'count url']
}

This will collect the url for each page opened and aggregate the data based on that value. After 10 seconds for each unique url a tuple will be generated which will also contain the views field. This field contains the number of times the url has occurred.

The buffer element takes a single unnamed parameter that defines the minimum time to buffer tuples. The buffer tries to send the data as soon as possible, but makes no guarantees. The data is send as a single list of tuples and stamped with the current time as packet time. This time can be retrieved using the time element..

Parameters

NameTypeDescription
timeStringPostfix time notation for collecting tuples. The postfix time notation uses ‘ms’ for milliseconds, ‘s’ for seconds, ‘m’ for minutes, ‘h’ for hours and ‘d’ for days.

buffer:aggregate

Perform aggregation as a streaming step on input data that arrives in a specific time slot

Example
Example

concept Example {
val url = `document.location.href`

flow
=> buffer:aggregate['10s', 'url', views = 'count url']
}

This will collect the url for each page opened and aggregate the data based on that value. After 10 seconds for each unique url a tuple will be generated which will also contain the views field. This field contains the number of times the url has occurred.

This element combines both the buffer and the aggregate flow elements into one element

Parameters

NameTypeDescription
timeStringString with postfix time notation for the time slow
fieldslist of fieldsthe fields for which the data is grouped

The postfix time notation uses ‘ms’ for milliseconds, ‘s’ for seconds, ‘m’ for minutes, ‘h’ for hours and ‘d’ for days. References fields available in the input tuple or extension on top of them.

buffer:session

Collect input tuples until a user session ends.

Example
concept Global {
match '*'

def guid = `dimml.sha1(+new Date()+Math.random().toString(36).slice(2)+navigator.userAgent).slice(20)`

val url = `location`
val sessionId = `sessionStorage.dimmlsid=sessionStorage.dimmlsid||guid()`

@groovy
flow
=> buffer:session['sessionId', timeout = '30s', `
session.startPage = session.startPage?:url
session.endPage = url
session.pageCount = (session.pageCount?:0)+1
false`]
=> console
}

This will collect the url for each page opened and the data based on the session ID. After 30 seconds for each unique url a tuple will be generated which will execute the code defined in the body. Since the startpage is stored only for the first tuple (and unchanged after that), it contains the first page of the visit. Similarly since the endPage parameter is updated for each new tuple, it will contain the URL of the last page of the visit. Finally the pagecount parameter is incremented with each page view. Since false is added to the body, no input tuples will be send to the next code element. Only 1 tuple will be send to the next code element, containing all session parameters. If for this DimML application first the URL http://documentation.dimml.io is opened followed by http://documentation.dimml.io/basic-syntax, the output tuple will be {startPage=http://documentation.dimml.io, endPage=http://documentation.dimml.io/basic-syntax, pageCount=2}

This element buffers input tuples them using a session id. Apart from the session id, a time out can be defined after which the session has ended. The body of this code element can be used to perform additional calculation on the input tuples such as storing data in the session object. The body is executed at the moment the input tuple reach the code element. If the body returns false, the input tuple is discard. This is particularly convenient for calculting properties of a session both only storing them at the end of the session when the session expiration took place. All fields of the session object will be available as fields in the resulting flow

Parameters

NameTypeDescription
timeStringString with postfix time notation for the time slow
sessionIdStringValue which is used to aggregate the data

The postfix time notation uses ‘ms’ for milliseconds, ‘s’ for seconds, ‘m’ for minutes, ‘h’ for hours and ‘d’ for days. The session id refers.

call

Execute a specified flow on the input tuple.

Example
concept Example {
match '*'
val a = `3.0`@groovy
val b = `4.0`@groovy

flow
=> call[`Functions:pythagoras`@dimml]
=> debug

plugin debug
}

concept Functions {
flow (pythagoras)
=> code[`Math.sqrt(a.power(2) + b.power(2))`@groovy]
}

This will result in the following tuples: {b=4.0, a=3.0, c=5.0}

Send the input tuple to a target flow and forward the resulting output. The target flow can be defined in a separate concept, possibly in a separate file. This effectively sends the data to a flow in another concept, executes that concept, and returns the result.

Parameters

target (code) – Description of the concept and flow to be executed

The called target flow must be available, and thus be included somewhere if it is defined in another file. Calls to the target flow share the same execution concept. Specifically, this means that calls to the same target flow from different places can be combined. The call flow elements can be used to simulate function calls. This means the function flows can be included in arbitrary flows, so the content of the input tuple is uncertain and performing desired checks is recommended. For example, in the pythagoras flow you might want to verify that both a and b are present and numeric.

catch

Catch exceptions and redirect their handling to another part of the flow graph.

Example
@groovy
concept Test {
match `once [dev]`@service

flow
=> catch (error)
=> code[`throw new IllegalStateException('My Mistake')`]
=> code[`[message: 'This is not executed']`]
=> console

flow (error)
=> code[`[message: 'An error has been caught']`]
=> console
}

code

Add values to the data tuple based on specified script code.

Example
concept Example {
match '*'

val browser = 'Mozilla/5.0 (Windows NT) Firefox/36.0'

flow
=> code[
ie = `ua.contains('MSIE')`,
firefox = `ua.indexOf('Firefox')>=0`@groovy
]
}

This will result in the following tuple: { browser: Mozilla/5.0 (Windows NT) Firefox/36.0, ie: false, firefox: true }

Define new fields based on client side or server side code.

Parameters

List of assignments – Scripts to define a value for a field

Each assignment contains an expression which is assigned to a field. This script can either be in Javascript (default, executed client side) or Groovy (executed server side). For the later, add @groovy after the script text.

compact

Add a new field to the data tuple that contains a map of the current fields as value.

Example
concept all {
match '*'

val firstName = 'John'
val lastName = 'Doe'

flow
=> compact['map']
=> console
}

This will result in the debug plugin showing the following text in the console: {lastName=Doe, firstName=John, map={lastName=Doe, firstName=John}}

Add a field called ‘compacted’ to the tuple that contains a (unmodifiable) map view of the tuple. The default field name can be overridden

Parameters

Field – Optional: field which the contains map

console

console:server

csv

debug

delay

drop

expand

filter

for

if

ip

join

json

next

pattern

property

purpose

route

sample

select

session

split

stream:close

stream:in

template

time

window:aggregate

ws