RTX

RealTime eXpressions

RTX is the internal language used for analyzing, filtering and transforming the data in a Cloudscale realtime data warehouse. Its role is therefore similar to the role played by MDX in querying the multidimensional data stored in a traditional offline data warehouse, or the role played by SQL in querying an offline relational database. The Cloudscale user interface enables users (business users or IT teams) to develop powerful RTX agents quickly and easily by dragging and dropping building blocks. Each Cloudscale building block corresponds to a single RTX expression.

Watch the introductory video from the DEMO 2010 conference in Palm Desert.

 

 
Streams

An RTX application processes one or more streams, and produces an output stream. A stream consists of a finite set of columns. For example, a market data stream might consist of three columns containing ticker symbol, price and volume information. A stream can also be considered as a finite or infinite timestamp-ordered sequence of rows, where each row contains exactly one element from each of the columns. For example, a market data stream might contain all of the NASDAQ Last Trade data from Monday 12th July 2010 at 12 noon UTC Time to Friday 3rd September 2010 at 10.30pm UTC Time. Another stream might contain every tweet sent to the Twitter site from Tuesday 9th March at 8.45am UTC Time up to the present time.

Streams might contain thousands, millions or billions of rows per hour. There is no upper limit. Continuously updated streams are essentially infinite, and continuous never-ending analytics computations can be run on them. The timestamps for each row are accurate to the microsecond level, and where two consecutive rows have identical timestamps, their ordering is precisely defined using using unique row identifiers for each timestamp.

Merging

When a single application is run on more than one stream, the streams are merged or interleaved together in time-order to form a single stream, using row identifiers to resolve any ambiguity.

Windows

An RTX application can perform certain parts of the computation, either sequentially or concurrently, on disjoint windows of a stream instead of performing it on the stream as a whole. Windows are either time-based or event-based.

A time-based window partitions the rows of the stream into disjoint contiguous time intervals of a certain length (number of microseconds). For example, part of an RTX app on a market data stream might need to partition the rows of the stream into 5-minute windows, while another part might need to simultaneously processes the same data in 40-second windows, and yet another might need to process the whole of the same stream together as a single unit in order to continuously check for a particular pattern that might appear anywhere in the stream.

An event-based window (called a "Window Frame" in RTX) is more general, and allows a stream to be partitioned into disjoint contiguous intervals based on any combination of criteria. For example, a particular part of an RTX app might need to process the stream in batches of one million rows, irrespective of how long those batches are in terms of time. Another part might need to process the stream in batches which end every time a particular combination of events are detected in the streams - for example, every time a large trade in a particular stock is detected and the time since the last such event is at least 200 seconds.

Groups

An RTX application can also perform certain parts of the computation, either sequentially or concurrently, on disjoint substreams instead of performing it on the stream as a whole. For example, part of an RTX app on a NASDAQ market data stream might need to partition the rows of the stream into disjoint substreams, in which each substream contains only those rows with a particular ticker symbol. In such a case, there would be a substream for the rows containing the ticker symbol GOOG, another for the rows containing MSFT, another for ORCL etc. In total there would be more than 3200 distinct substreams being processed simultaneously, corresponding to the number of companies whose stocks are traded on NASDAQ. There is no limit on the number of substreams in an RTX app. An app may process a stream as a disjoint collection of thousands or milllions of substreams.

Partitions

Normally, an RTX app produces a single output stream. However, by setting simple parameters, it can instead partition the output stream into a set of disjoint substreams that are output as separate streams. For example, an RTX app processing a realtime stream of Facebook user data might output the results as ten separate streams based on the Facebook user id: one stream containing the rows for those users whose id ends with the digit 0, another for those ending in 1, and eight other streams for those ending in 2,3,4,5,6,7,8,9.

Applications

An RTX application can be fully specified by giving:

  • one or more input streams
  • start time for the interval
  • end time for the interval
  • select expression (optional)
  • parallel value (optional)
  • set of input columns
  • set of expression columns

The optional select expression defines the set of rows that should be included in the output stream. The default value is that all rows are output. In an RTX app, the rows in a stream cannot be rearranged, and new rows cannot be added. The set of rows can, however, be reduced by a select expression.

The optional parallel value defines the scale of MapReduce-style parallelism that should be used in carrying out the RTX computation. The default value is zero, which means that the computation should be carried out as a single task. If the value is set to n, where n>0, and the length of the interval for the computation is m minutes, then the computation is divided into k disjoint parallel subcomputations, where k=m/n. For example, if the RTX app is being run to process 8 hours of data and the parallel value is set to 12, then the RTX app will be run as 480/12=40 separate disjoint parallel computations, giving up to a 40x speedup.

The set of input columns defines the columns of the stream that will be used by the RTX app. For example, an app running on the Twitter stream may only require two or three of the columns in that stream. The other 30+ columns in that stream will be ignored by the computation.

The set of expression columns in an RTX app define the new columns that are computed when the app is run. Each expression creates a new column. The order of expressions in an RTX app is not significant - the set of expressions can appear in any order. Expressions can contain the standard operators found in most programming languages - operators for arithmetic (+,-,*,/,%), boolean logic (and, or, not), and relational operators (==, !=, <, <=, >,>=). Expressions can also contain functions drawn from a library of over 200 built-in RTX functions. An expression column can have an optional GroupBy and/or an optional Window associated with it (see Groups, Windows above). If it has a Window, it can either be an event-based window (Window Frame), in which case it is specified by a boolean expression, or a time-based window, in which case it is specified by giving an alignment and duration. 

Example App
Input stream: NASDAQ Last Trade
Start Time: Monday 12th July 2010. 12 noon UTC Time
End Time: Friday 3rd September 2010. 10.30pm UTC Time
Select: first_row
Parallel: 240
Inputs: symbol, price, volume
vwap = sum(price*volume)/sum(volume) [ Window:(10mins, aligned:12noon) ; GroupBy:symbol ]
first_row = window_start [ Window:(10mins, aligned:12noon) ; GroupBy:symbol ]
Functions
  • Generic: maximum, minimum
  • Numeric: arithmetic, random, scientific, trigonometric
  • Logical: boolean, conditional, relational
  • Time: duration, timestamp, year, month, day, hour, minute, second
  • String: append, empty, find_char, find_substring, insert_string, regex_match, remove_substring, replace_substring, return_char, score_lucene_query, size
  • Array: find_subarray, find_value, insert_value, remove_value, return_value, set_value
  • Stream: match_pattern, prefix, previous, scan
  • Window: all_true, any_true, bottom_k, end, first, frequency, k_random_rows, last, length, longest_decreasing_subsequence, longest_increasing_subsequence, maximum, minimum, product, rank, reverse, shortest_path, sort, start, sum, top_k, unique_values
  • External Data Structures: deque, hash_map, hash_set, tree_map, tree_set, vector
  • Types: convert_type, is_finite, is_infinite, is_nan, is_zero
  • Defaults: default_value, maximum_value, minimum_value, zero_value 
 
Facebook Twitter
 
Cloudscale: The Realtime Data Warehouse

Alert. Clean. Connect. Encode. Filter. Group. Map. Match. Merge. Partition. Rank. Reduce. Reorder. Sample. Sort. Transform. Validate. Window.