# apama-rxepl
**Repository Path**: mirrors_SoftwareAG/apama-rxepl
## Basic Information
- **Project Name**: apama-rxepl
- **Description**: ReactiveX for Apama EPL Created by Global Presales.
- **Primary Language**: Unknown
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2020-09-26
- **Last Updated**: 2026-03-08
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# RxEPL - Observables in EPL [](https://travis-ci.org/SoftwareAG/apama-rxepl) [](https://coveralls.io/github/SoftwareAG/apama-rxepl?branch=master)
ReactiveX is a framework designed to handle streams of data like water through pipes. RxEPL is a library that implements the framework in EPL ([Apama](http://www.apamacommunity.com/)'s programming language), it is also available in [most](http://reactivex.io/languages.html) major programming languages.
```javascript
IObservable temperatureBreaches :=
Observable.fromChannel("TemperatureSensor") // Get all of the events being sent to this channel
.pluck("temperature") // Get the temperature value
.filter(Lambda.predicate("T => T > 30")); // Filter to only the temperatures we want
// Generate an alert
ISubscription generateAlerts := temperatureBreaches.subscribe(Subscriber.create().onNext(generateAlert));
```
Features:
* Functional Programming
* [Chainable Operators](#operators) (without "callback hell"!)
* [Error handling](#errors)
* ["Easy" multithreading](#multithreading)
For a comprehensive introduction to ReactiveX and Observables see the [ReactiveX Website](http://reactivex.io/intro.html).
If you'd like to try RxEPL we highly recommend starting with the [web-based drag'n'drop tool](https://softwareag.github.io/apama-rxbuilder/) to allow you to create observables without ever touching the code:
## Contents
* [Installation](#install)
* [Quickstart](#quick)
* Main
* [API Documentation](#api-docs)
* [Package Structure](#packages)
* [Examples](#examples)
* [Observable](#observable)
* [Creating](#observable-construction)
* [Subscribing](#subscribing)
* [Operators](#operators)
* [Joining & Combining](#combining)
* [Error Handling](#errors)
* [Publishing/Sharing and Multicasting](#multicasting)
* [Subject](#subject)
* [Types](#subject-types)
* [Creating](#subject-construction)
* [Sending Data](#subject-sending)
* [Interoperability](#interop)
* [Streams](#streams)
* [Channels](#channels)
* [Multithreading](#multithreading)
* [Debugging](#debugging)
* [Reusability: Build your own Operator](#reusability)
* [Gotchas](#gotchas)
* [Multiple Subscribers](#gotcha-multiple-subscribers)
* [Publish/Share with SubscribeOn](#gotcha-subscribe-on)
* [Help and Other Resources](#other)
## Installation
First head over to the [Release Area](https://github.com/SoftwareAG/apama-rxepl/releases) and download the latest release.
The deployment script (Inside the release) provides a way to make ReactiveX for EPL (RxEPL) globally available to all SoftwareAG Designer workspaces.
### 1. Installing into Designer
1. Place the RxEPL folder somewhere safe (somewhere not likely to be moved or deleted)
2. Run the deploy.bat
3. Follow the instructions
4. Restart any running instances of SoftwareAG Designer
### 2. Adding to a Project
1. From SoftwareAG Designer right click on your project in `Project Explorer`
2. Select `Apama` from the drop down menu;
3. Select `Add Bundle`
4. Scroll down to `RX_EPL_HOME/bundles` and select `RxEpl`
5. Click `Ok`
When run via Designer, it will automatically inject all of the dependencies.
### 3. Packaging a project (For use outside Designer)
The Apama tool `engine_deploy` packages a project so that it can be run outside of designer.
1. Start an Apama Command Prompt (Start menu, Software AG, Tools, Apama, Apama Command Prompt)
2. `cd` to your project directory
3. Run `engine_deploy --outputDeployDir output.zip . /rxepl.properties`.
You'll end up with a zip of your entire project.
4. Unzip it on whichever machine you'd like to use the project.
5. Run `correlator --config initialization.yaml --config initialization.properties` from within the unzipped directory to run the project.
## Quickstart
1. First check out the [Installation Instructions](#install), steps 1 & 2.
2. We highly recommend installing [Lambdas for Epl](https://github.com/SoftwareAG/apama-lambdas) package too ([Here's why](#lambdas)).
3. Start Software AG Designer.
4. Copy the following code into a new file: "Main.mon"
```javascript
using com.industry.lambdas.Lambda;
using com.industry.rx_epl.Observable;
using com.industry.rx_epl.IObservable;
using com.industry.rx_epl.Subscriber;
using com.industry.rx_epl.ISubscription;
monitor Main {
action onload() {
// Some example observables sources
IObservable someValues := Observable.fromValues([1, 2, 3]);
IObservable someValuesFromAChannel := Observable.fromChannel("inputChannel");
// Some observable operators
IObservable someModifiedValues := someValues
.map(Lambda.function1("x => x * 10"))
.scan(Lambda.function2("sum, x => sum + x"));
// Some output
ISubscription s := someModifiedValues
.subscribe(Subscriber.create().onNext(logValue));
}
action logValue(any value) {
log value.valueToString();
}
}
```
4. Run the project.
## API Documentation
The [API documentation is available here](docs#api-documentation).
Note: Although the API documentation covers all of the public API, the main README is a better introduction.
## Package Structure
**Interfaces:**
`com.industry.rx_epl.IObservable` - The main observable interface, returned after calling an operator or constructor (See [Observable](#observable))
`com.industry.rx_epl.ISubject` - The subject interface, returned from construction of a subject (see [Subject](#subject))
`com.industry.rx_epl.ISubscription` - Returned by `.subscribe()` allows unsubscription
`com.industry.rx_epl.IDisposable` - Sometimes a listener has to be created that this library doesn't know when to tear down, in that case it returns an `IDisposable` and should be torn down by the user if/when all subscribers are done (see [Multithreading](#multithreading))
`com.industry.rx_epl.IResolver` - Similar to a subject has next, error, complete methods. Used in [Observable.create(...)](#observable-construction)
**Constructors:**
`com.industry.rx_epl.Observable` - The event from which to construct an IObservable (See [Observable](#observable))
`com.industry.rx_epl.Subject` - A simple subject, allowing next, error, complete events to be sent (see [Subject](#subject))
`com.industry.rx_epl.BehaviourSubject` - A subject that always repeats the most recent value to a new subscriber (see [Subject](#subject))
`com.industry.rx_epl.Subscriber` - A subscriber. Defines handling for onNext, onError, onComplete (see [Subscribing](#subscribing))
**Operators:**
`com.industry.rx_epl.operators.*` - All pipeable operators (See [Operators](#operators). The [APIDoc](/docs/README.md#api-documentation) contains a full list)
## Examples
There are several examples that ship with the source code. These are located in the `samples` folder.
## Observable
### Constructing an Observable
Creating an Observable is usually the starting point of an Observable chain (See also: [Subject](#subject))
```javascript
IObservable o := Observable.just("Hello World");
o := Observable.fromValues([ 0, 1, 2, "Hello", "World"]);
o := Observable.range(0,5);
o := Observable.interval(1.0);
o := Observable.fromChannel("MyChannel");
o := Observable.fromStream(from e in all E() select e);
o := Observable.create(generator);
action generator(IResolver r) {
r.next("Hello");
r.complete();
}
```
### Subscribing - Receiving data
Subscribing is the main way to extract information from an observable pipe.
**Important To Note:** Every subscriber gets it's own pipe to the observable source so operators are run once per subscription. To avoid this use [publish or share](#multicasting).
**onNext** - Used to get every data point from the pipe
```javascript
IObservable o := Observable.interval(1.0);
ISubscription s := o.subscribe(Subscriber.create().onNext(printValue));
// Output: 0, 1, 2, 3...
action printValue(any value) {
print value.valueToString();
}
```
**onComplete** - Called when the pipe is closing (not all observables complete)
```javascript
IObservable o := Observable.just("Hello World");
ISubscription s := o.subscribe(Subscriber.create().onNext(printValue).onComplete(printDone));
// Output: Hello World, Done
action printDone() {
print "Done";
}
```
**onError** - Called when the pipe is closing due to an error
```javascript
IObservable o := Observable.error();
ISubscription s := o.subscribe(Subscriber.create().onError(printError));
// Output: com.apama.exceptions.Exception(...)
action printError(any e) {
print e.valueToString();
}
```
### Operators
All of the built-in operators are accessible directly from the IObservable interface:
```javascript
IObservable o := Observable.range(0,20)
.skip(1)
.take(3)
.map(Lambda.function1("x => x * 10"));
ISubscription s := o.subscribe(Subscriber.create().onNext(printValue).onComplete(printDone));
// Output: 1, 2, 3, Done
```
They are also accessible via a "pure function" pipe, and can then be combined with custom operators:
```javascript
IObservable o := Observable.range(0,20)
.let(Skip.create(1)) // Use a single operator
.pipe([ // Chain multiple operators
Take.create(3),
Map.create(Lambda.function1("x => x * 10")),
MyCustomOperator.create(123.4)
]);
ISubscription s := o.subscribe(Subscriber.create().onNext(printValue).onComplete(printDone));
// Output: 1, 2, 3, Done
```
Which you choose to use is up to you.
There are far too many operators to go through every one here, but there is a really handy decision tree in the [external links section](#other) to help you find the one you need. The [APIDoc](/docs/README.md#api-documentation) lists all of the operators.
## Joining and Combining
**Merge** - Combine all values from multiple observables onto one observable
```javascript
IObservable o1 := Observable.interval(1.0);
IObservable o2 := Observable.interval(1.5);
IObservable combined := Observable.merge([o1, o2]);
// Output: 0, 0, 1, 2, 1, 3, 2, 4...
```
**Concat** - Append the values from one observable to the values from another
```javascript
IObservable o1 := Observable.interval(1.0).take(3);
IObservable o2 := Observable.interval(1.5);
IObservable combined := Observable.concat([o1, o2]);
// Output: 0, 1, 2, 0, 1, 2...
```
**CombineLatest** - Every time an event is received combine it with the latest from every other observable
```javascript
IObservable o1 := Observable.interval(1.0);
IObservable o2 := Observable.interval(1.5);
IObservable combined := Observable.combineLatest([o1, o2], toIntSequence);
// Output: [0,0], [1,0], [2,0], [2,1]...
// @Time: 1.0 , 2.0 , 3.0 , 3.0
```
**WithLatestFrom** - Every time an event is receive on the main observable combine it with the latest from every other observable
```javascript
IObservable o1 := Observable.interval(1.0);
IObservable o2 := Observable.interval(1.5);
IObservable combined := o1.withLatestFrom([o2], toIntSequence);
// Output: [1,0], [2,1], [2,1]...
// @Time: 2.0 , 3.0 , 4.0
```
**Zip** - Sequentially combine each event with an event from every other observable
```javascript
IObservable o1 := Observable.interval(1.0);
IObservable o2 := Observable.interval(1.5);
IObservable combined := Observable.zip([o1, o2], toIntSequence);
// Output: [0,0], [1,1], [2,2], [3,3]...
// @Time: 1.5 , 3.0 , 4.5 , 6.0
```
## Error Handling
Observables handle errors in almost exactly the same they handle "complete" events. They are passed along the chain of observers until they are handled. If they are not handled by the final subscriber then the subscriber will throw an exception.
### Subscriber.onError
Subscribing to the error output of an observable allows a subscriber to manually handle errors
```javascript
ISubscription s := o.subscribe(Subscriber.create().onError(printError));
action printError(any e) {
log e.valueToString() at ERROR;
}
```
### IObservable.catchError
Catching errors allows an observable chain to substitute an alternative observable source in the event of an error.
```javascript
IObservable o; // = 0, 1, Error
ISubscription s := o
.catchError(Observable.just("Use this instead"))
.subscribe(Subscriber.create().onNext(printValue));
// Output: 0, 1, Use this instead
```
### IObservable.retry
Retry instructs the observable to reconnect to the source in the event of an error.
```javascript
IObservable hotObservable; // = 0, 1, Error, 2, 3
ISubscription s := hotObservable
.retry(1)
.subscribe(Subscriber.create()
.onNext(printValue)
.onError(printError)
.onComplete(printComplete));
// Output: 0, 1, 2, 3
```
## Interoperability
### Apama Streams
**Receiving from a stream**
```javascript
// Receiving events
IObservable o := Observable.fromStream(from e in all MyEventType() select e);
// Receiving values
using com.industry.rx_epl.WrappedAny;
IObservable o := Observable.fromStream(from e in all WrappedAny() select e.value);
```
**Output to a stream**
```javascript
IObservable o := Observable.interval(1.0);
DisposableStream strm := o.toStream();
from value in strm.getStream() select value {
log value.valueToString() at INFO;
}
// When done, the stream should be disposed
strm.dispose();
```
### Channels
**Receiving from a channel**
```javascript
// Receiving events
IObservable o := Observable.fromChannel("myChannel");
send MyEvent("abc", 123) to "myChannel";
// Values sent in a WrappedAny are automatically unwrapped
using com.industry.rx_epl.WrappedAny;
send WrappedAny("abc") to "myChannel";
```
**Output to a channel**
```javascript
IObservable o := Observable.interval(1.0);
IDisposable d := o.toChannel("myChannel");
```
## Multithreading
Multithreading allows complex or slow processing to be handled asynchronously on a different context.
### observeOn
Observe on is useful when you want the subscription and some of the processing to be done on a different thread. The connection to the original observable is still done on the main context and all data is forwarded to the other context.
```javascript
// Ideally should dispose of this when the spawned context is done processing (if ever)
IDisposable d := Observable.interval(1.0).observeOnNew(doSomething);
action doSomething(IObservable source) {
// This part will run on a different context
ISubscription s := source.take(4)
.subscribe(Subscriber.create().onNext(printValue));
}
// Output from "A specific context": 0, 1, 2, 3
```
### observeToChannel, observeFromChannel
ObserveToChannel and ObserveFromChannel are useful for sending data between different monitor instances which may or may not be on different contexts.
```javascript
// Ideally should dispose of this when all subscribers are finished (if ever)
IDisposable d := Observable.interval(1.0).observeToChannel("channelName");
// This could be in a different monitor
ISubscription s := Observable.observeFromChannel("channelName")
.subscribe(Subscriber.create().onNext(printValue));
// Output: 0, 1, 2, 3
```
### pipeOn
PipeOn allows part of the processing in a chain to be done on a different context. Data is received on the main context, forwarded to another context for processing, and finally forwarded back to the main context for subscription.
```javascript
// Values start on the main context
ISubscription s := Observable.interval(1.0)
// Send to a different context to do the heavy lifting
.pipeOnNew([Map.create(multiplyBy10)]);
// Back on the main context for the output
.subscribe(Subscriber.create().onNext(printValue));
// Output: 0, 10, 20, 30...
```
### complexPipeOn
Much the same as PipeOn, ComplexPipeOn allows part of an observable chain to be processed on a separate context.
```javascript
// Values start on the main context
ISubscription s := Observable.interval(1.0)
// Send to a different context to do the heavy lifting
.complexPipeOnNew(doSomething);
// Back on the main context for the output
.subscribe(Subscriber.create().onNext(printValue));
action doSomething(IObservable o) returns IObservable {
return o.map(multiplyBy10)
}
// Output: 0, 10, 20, 30...
```
### subscribeOn
SubscribeOn will move an entire chain (from source to subscription) onto another context. There are a couple of [Gotchas](#gotchas) with this when used with publishing and sharing. Generally it is recommend to manually spawn a new context manually rather than rely on this.
```javascript
ISubscription s := Observable.interval(1.0)
.map(multiplyBy10)
// Move all processing to a different context
// (including the .map and the observable source)
.subscribeOnNew(Subscriber.create()
.onNext(printValue));
// Output from "A specific context": 0, 10, 20, 30...
```
## Publishing and Sharing
Publishing and Sharing are methods of converting an Observable a multicast emitter.
By default when a normal observable is subscribed to an entirely new processing chain is created (one for each subscriber), this can result in some unexpected behaviour:
```javascript
IObservable o := Observable.interval(1.0);
ISubscription s := o.subscribe(Subscriber.create().onNext(printValue));
// Output: 0, 1, 2, 3...
on wait(2.0) {
ISubscription s2 := o.subscribe(Subscriber.create().onNext(printValue));
// Output: 0, 1, 2, 3... What?!
}
```
The first subscriber creates it's own interval, and so does the second one. These are not linked.
### Publish
Publish allows a single observable subscription to be shared among various other subscribers. The upstream subscription is only created when connect is called so downstreams will not receive values until then.
```javascript
IObservable o := Observable.interval(1.0).publish();
ISubscription s := o.subscribe(Subscriber.create().onNext(printValue));
IDisposable d := o.connect();
// Output: 0, 1, 2, 3...
on wait(2.0) {
ISubscription s2 := o.subscribe(Subscriber.create().onNext(printValue));
// Output: 2, 3... Connected 2s after .connect() was called so missed 2 values
}
```
```javascript
IObservable o := Observable.interval(1.0).publish();
ISubscription s := o.subscribe(Subscriber.create().onNext(printValue));
// Output: 0, 1, 2, 3...
on wait(2.0) {
ISubscription s2 := o.subscribe(Subscriber.create().onNext(printValue));
// Output: 0, 1, 2, 3...
IDisposable d := o.connect();
}
```
### Publish RefCount
Manually connecting to a published observable can be a pain. RefCount will automatically connect to a published observable when the first subscriber subscribes. It keeps a count of the number of subscribers so that when the count drops to zero it can disconnect. A subscription after disconnection will result in a re-connection.
publish().refCount() is so common that it has a shorthand [.share()](#share)
```javascript
IObservable o := Observable.interval(1.0).publish().refCount(); // can be replaced with .share()
ISubscription s := o.subscribe(Subscriber.create().onNext(printValue));
// Output: 0, 1, 2, 3...
on wait(2.0) {
ISubscription s2 := o.subscribe(Subscriber.create().onNext(printValue));
// Output: 2, 3...
}
```
### Share
Share is a shorthand for [.publish().refCount()](#refcount).
```javascript
IObservable o := Observable.interval(1.0).share();
ISubscription s := o.subscribe(Subscriber.create().onNext(printValue));
// Output: 0, 1, 2, 3...
on wait(2.0) {
ISubscription s2 := o.subscribe(Subscriber.create().onNext(printValue));
// Output: 2, 3...
}
```
## Subject
Subjects are just like channels in EPL in that they are a multicast way to send messages to all subscribers. Usually any late subscribers miss the messages. There are a few different types of subject that have various different behaviours.
### Types
**Subject** - A normal subject, sends messages to all subscribers
**BehaviourSubject** - A subject that always has a current value and sends this to new subscribers.
**ReplaySubject** - Replays a certain number of missed values to new subscribers.
### Constructing a Subject
Creating a Subject is an alternative way to start an Observable chain (See also: [Observable](#observable))
```javascript
ISubject s := Subject.create();
s := BehaviourSubject.create("InitialValue");
s := ReplaySubject.create(3);
```
### Sending Data
**next(any value)** - Send the next value to be processed
```javascript
ISubject subject := Subject.create();
ISubscription s := s.subscribe(Subscriber.create().onNext(printValue));
subject.next(1);
// Output: 1
```
**complete()** - Close the pipe
```javascript
ISubject subject := Subject.create();
ISubscription s := s.subscribe(Subscriber.create().onNext(printValue).onComplete(printDone));
subject.next(1);
subject.complete();
// Output: 1, Done
```
**error(any error)** - Send an error and close the pipe
```javascript
ISubject subject := Subject.create();
ISubscription s := s.subscribe(Subscriber.create().onError(printError));
subject.error("An error occurred");
// Output: An error occurred
```
## Debugging
### do
Do is a handy operator that allows you to inspect the values as they pass through a chain of observables. It is like a subscriber except that it is passive, only receiving messages when someone is subscribed further down the chain.
```javascript
IObservable o := Observable.interval(1.0);
ISubscription s := o.take(5);
.do(LoggingSubscriber.create("BeforeMultiplication"));
.map(multiplyBy10)
.subscribe(LoggingSubscriber.create("Result"));
// Output from Do: 0, 1, 2, 3, 4, Completed, Unsubscribed
// Output from Subscribe: 0, 10, 20, 30, 40, Completed, Unsubscribed
```
## Reusability: Build your own Observable Operator
Lets say you have the following:
```javascript
observable
.pluck("temperature")
.filter(aboveThreshold);
```
but you want it to be reusable.
There are several options:
### Option 1: Wrap into an action - Good
```javascript
action tempTooHigh(IObservable source) returns IObservable {
return observable
.pluck("temperature")
.filter(aboveThreshold);
}
```
You might be tempted just to call that action eg. `tempTooHigh(myObservableTemps)`, but **don't**, there's a better way:
```javascript
IObservable o := myObservableTemps
.complexPipe(tempTooHigh);
```
Why is this better? It is much more obvious what order the chain is processing in.
### Option 2: Convert to pipe - Better
```javascript
sequence returns ISubscription> > returns
action returns ISubscription>
tempTooHigh := [Pluck.create("temperature"), Filter.create(outsideThreshold)];
IObservable o := myObservableTemps
.pipe(tempTooHigh);
```
### Option 3: Create a custom pipe - Best
```javascript
event TempTooHigh {
static action create() returns action returns ISubscription>
returns action returns ISubscription {
return Pipe.create([
Pluck.create("temperature"),
Filter.create(outsideThreshold)
]);
}
}
IObservable o := myObservableTemps
.pipe([TempTooHigh.create()]);
// or
IObservable o := myObservableTemps
.let(TempTooHigh.create());
```
## Gotchas
### Multiple Subscribers
```javascript
IObservable sharedObs := Observable.interval(1.0);
ISubscription s1 := sharedObs.subscribe(Subscriber.create().onNext(printValue));
// Output: 0,1,2,3...
on wait(2.0) {
ISubscription s2 := sharedObs.subscribe(Subscriber.create().onNext(printValue));
// Output: 0,1,2,3... What?
}
```
**Why?**
Each subscriber creates its own chain through to the source. In this case that means that each subscriber get it's own interval.
**Solution**
Use [Share](#multicasting)
```javascript
IObservable sharedObs := Observable.interval(1.0).share();
ISubscription s1 := sharedObs.subscribe(Subscriber.create().onNext(printValue));
// Output: 0,1,2,3...
on wait(2.0) {
ISubscription s2 := sharedObs.subscribe(Subscriber.create().onNext(printValue));
// Output: 2,3...
}
```
### Publish/Share and SubscribeOn
```javascript
IObservable sharedObs := Observable.interval(1.0).share();
ISubscription s1 := sharedObs.subscribe(Subscriber.create().onNext(printValue));
ISubscription s2 := sharedObs.subscribeOnNew(Subscriber.create().onNext(printValue));
// Output on "Main Context": 0,1,2,3...
// Output on "New Context": Nothing.... What?!
```
**Why?**
Publish and Share both store information about their current subscribers and upstream connections. When the entire chain is moved onto another context this information is copied and the operators no longer function correctly.
***Solution*** - `.decouple()`
```javascript
IObservable sharedObs := Observable.interval(1.0)
.share();
ISubscription s1 := sharedObs.subscribe(Subscriber.create().onNext(printValue));
ISubscription s2 := sharedObs.decouple() // Note: observeOn may be a better solution
.subscribeOnNew(Subscriber.create().onNext(printValue));
// Output on "Main Context": 0,1,2,3...
// Output on "New Context": 0,1,2,3...
```
This helps by 'decoupling' the upstream and the downstream. When subscribeOn is called only the downstream is copied. This means that only part of the chain is running on a separate context. A better solution might be to use ObserveOn instead of subscribeOn.
## EPL Lambdas
Although this library can be used as a standalone library, we highly recommend that it is used in conjunction with the [Lambdas for Apama EPL](https://github.com/SoftwareAG/apama-lambdas) library.
It allows you to turn this:
```javascript
IObservable o := Observable.fromValues([1,2,3,4])
.map(multiplyBy10)
.reduce(sum);
action multiplyBy10(any value) returns any { // Will only work on integers
return value * 10;
}
action sum(any currentSum, any value) returns any { // Will only work on integers
return currentSum + value;
}
```
Into this:
```javascript
IObservable o := Observable.fromValues([1,2,3,4])
.map(Lambda.function1("x => x * 10")) // Will work on any numeric type
.reduce(Lambda.function2("sum, x => sum + x")); // Will work on any numeric type
```
There are many added benefits to using Lambdas, one of the main advantages being automatic type coercion. You'll never have to cast a numeric value - they're automatically converted to an appropriate type.
## Help and Other Resources
**[ReactiveX Website](http://reactivex.io/)** - A great place to get info about the background to the framework.\
**[Decision Tree Of Observables](http://reactivex.io/documentation/operators.html#tree)** - Don't know which operator to use? Follow this\
**[RxMarbles](http://rxmarbles.com/)** - An interactive tool to play with observable operators
------------------------------
These tools are provided as-is and without warranty or support. They do not constitute part of the Software AG product suite. Users are free to use, fork and modify them, subject to the license agreement. While Software AG welcomes contributions, we cannot guarantee to include every contribution in the master project.