# apama-rxepl **Repository Path**: mirrors_SoftwareAG/apama-rxepl ## Basic Information - **Project Name**: apama-rxepl - **Description**: ReactiveX for Apama EPL Created by Global Presales. - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2020-09-26 - **Last Updated**: 2026-03-08 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # RxEPL - Observables in EPL [![Build Status](https://travis-ci.org/SoftwareAG/apama-rxepl.svg?branch=master)](https://travis-ci.org/SoftwareAG/apama-rxepl) [![Coverage Status](https://coveralls.io/repos/github/SoftwareAG/apama-rxepl/badge.svg?branch=master)](https://coveralls.io/github/SoftwareAG/apama-rxepl?branch=master) ReactiveX is a framework designed to handle streams of data like water through pipes. RxEPL is a library that implements the framework in EPL ([Apama](http://www.apamacommunity.com/)'s programming language), it is also available in [most](http://reactivex.io/languages.html) major programming languages. ```javascript IObservable temperatureBreaches := Observable.fromChannel("TemperatureSensor") // Get all of the events being sent to this channel .pluck("temperature") // Get the temperature value .filter(Lambda.predicate("T => T > 30")); // Filter to only the temperatures we want // Generate an alert ISubscription generateAlerts := temperatureBreaches.subscribe(Subscriber.create().onNext(generateAlert)); ``` Features: * Functional Programming * [Chainable Operators](#operators) (without "callback hell"!) * [Error handling](#errors) * ["Easy" multithreading](#multithreading) For a comprehensive introduction to ReactiveX and Observables see the [ReactiveX Website](http://reactivex.io/intro.html). If you'd like to try RxEPL we highly recommend starting with the [web-based drag'n'drop tool](https://softwareag.github.io/apama-rxbuilder/) to allow you to create observables without ever touching the code: ## Contents * [Installation](#install) * [Quickstart](#quick) * Main * [API Documentation](#api-docs) * [Package Structure](#packages) * [Examples](#examples) * [Observable](#observable) * [Creating](#observable-construction) * [Subscribing](#subscribing) * [Operators](#operators) * [Joining & Combining](#combining) * [Error Handling](#errors) * [Publishing/Sharing and Multicasting](#multicasting) * [Subject](#subject) * [Types](#subject-types) * [Creating](#subject-construction) * [Sending Data](#subject-sending) * [Interoperability](#interop) * [Streams](#streams) * [Channels](#channels) * [Multithreading](#multithreading) * [Debugging](#debugging) * [Reusability: Build your own Operator](#reusability) * [Gotchas](#gotchas) * [Multiple Subscribers](#gotcha-multiple-subscribers) * [Publish/Share with SubscribeOn](#gotcha-subscribe-on) * [Help and Other Resources](#other) ## Installation First head over to the [Release Area](https://github.com/SoftwareAG/apama-rxepl/releases) and download the latest release. The deployment script (Inside the release) provides a way to make ReactiveX for EPL (RxEPL) globally available to all SoftwareAG Designer workspaces. ### 1. Installing into Designer 1. Place the RxEPL folder somewhere safe (somewhere not likely to be moved or deleted) 2. Run the deploy.bat 3. Follow the instructions 4. Restart any running instances of SoftwareAG Designer ### 2. Adding to a Project 1. From SoftwareAG Designer right click on your project in `Project Explorer` 2. Select `Apama` from the drop down menu; 3. Select `Add Bundle` 4. Scroll down to `RX_EPL_HOME/bundles` and select `RxEpl` 5. Click `Ok` When run via Designer, it will automatically inject all of the dependencies. ### 3. Packaging a project (For use outside Designer) The Apama tool `engine_deploy` packages a project so that it can be run outside of designer. 1. Start an Apama Command Prompt (Start menu, Software AG, Tools, Apama, Apama Command Prompt) 2. `cd` to your project directory 3. Run `engine_deploy --outputDeployDir output.zip . /rxepl.properties`. You'll end up with a zip of your entire project. 4. Unzip it on whichever machine you'd like to use the project. 5. Run `correlator --config initialization.yaml --config initialization.properties` from within the unzipped directory to run the project. ## Quickstart 1. First check out the [Installation Instructions](#install), steps 1 & 2. 2. We highly recommend installing [Lambdas for Epl](https://github.com/SoftwareAG/apama-lambdas) package too ([Here's why](#lambdas)). 3. Start Software AG Designer. 4. Copy the following code into a new file: "Main.mon" ```javascript using com.industry.lambdas.Lambda; using com.industry.rx_epl.Observable; using com.industry.rx_epl.IObservable; using com.industry.rx_epl.Subscriber; using com.industry.rx_epl.ISubscription; monitor Main { action onload() { // Some example observables sources IObservable someValues := Observable.fromValues([1, 2, 3]); IObservable someValuesFromAChannel := Observable.fromChannel("inputChannel"); // Some observable operators IObservable someModifiedValues := someValues .map(Lambda.function1("x => x * 10")) .scan(Lambda.function2("sum, x => sum + x")); // Some output ISubscription s := someModifiedValues .subscribe(Subscriber.create().onNext(logValue)); } action logValue(any value) { log value.valueToString(); } } ``` 4. Run the project. ## API Documentation The [API documentation is available here](docs#api-documentation). Note: Although the API documentation covers all of the public API, the main README is a better introduction. ## Package Structure **Interfaces:**
`com.industry.rx_epl.IObservable` - The main observable interface, returned after calling an operator or constructor (See [Observable](#observable))
`com.industry.rx_epl.ISubject` - The subject interface, returned from construction of a subject (see [Subject](#subject))
`com.industry.rx_epl.ISubscription` - Returned by `.subscribe()` allows unsubscription
`com.industry.rx_epl.IDisposable` - Sometimes a listener has to be created that this library doesn't know when to tear down, in that case it returns an `IDisposable` and should be torn down by the user if/when all subscribers are done (see [Multithreading](#multithreading))
`com.industry.rx_epl.IResolver` - Similar to a subject has next, error, complete methods. Used in [Observable.create(...)](#observable-construction)
**Constructors:**
`com.industry.rx_epl.Observable` - The event from which to construct an IObservable (See [Observable](#observable))
`com.industry.rx_epl.Subject` - A simple subject, allowing next, error, complete events to be sent (see [Subject](#subject))
`com.industry.rx_epl.BehaviourSubject` - A subject that always repeats the most recent value to a new subscriber (see [Subject](#subject))
`com.industry.rx_epl.Subscriber` - A subscriber. Defines handling for onNext, onError, onComplete (see [Subscribing](#subscribing))
**Operators:**
`com.industry.rx_epl.operators.*` - All pipeable operators (See [Operators](#operators). The [APIDoc](/docs/README.md#api-documentation) contains a full list)
## Examples There are several examples that ship with the source code. These are located in the `samples` folder. ## Observable ### Constructing an Observable Creating an Observable is usually the starting point of an Observable chain (See also: [Subject](#subject)) ```javascript IObservable o := Observable.just("Hello World"); o := Observable.fromValues([ 0, 1, 2, "Hello", "World"]); o := Observable.range(0,5); o := Observable.interval(1.0); o := Observable.fromChannel("MyChannel"); o := Observable.fromStream(from e in all E() select e); o := Observable.create(generator); action generator(IResolver r) { r.next("Hello"); r.complete(); } ``` ### Subscribing - Receiving data Subscribing is the main way to extract information from an observable pipe. **Important To Note:** Every subscriber gets it's own pipe to the observable source so operators are run once per subscription. To avoid this use [publish or share](#multicasting). **onNext** - Used to get every data point from the pipe ```javascript IObservable o := Observable.interval(1.0); ISubscription s := o.subscribe(Subscriber.create().onNext(printValue)); // Output: 0, 1, 2, 3... action printValue(any value) { print value.valueToString(); } ``` **onComplete** - Called when the pipe is closing (not all observables complete) ```javascript IObservable o := Observable.just("Hello World"); ISubscription s := o.subscribe(Subscriber.create().onNext(printValue).onComplete(printDone)); // Output: Hello World, Done action printDone() { print "Done"; } ``` **onError** - Called when the pipe is closing due to an error ```javascript IObservable o := Observable.error(); ISubscription s := o.subscribe(Subscriber.create().onError(printError)); // Output: com.apama.exceptions.Exception(...) action printError(any e) { print e.valueToString(); } ``` ### Operators All of the built-in operators are accessible directly from the IObservable interface: ```javascript IObservable o := Observable.range(0,20) .skip(1) .take(3) .map(Lambda.function1("x => x * 10")); ISubscription s := o.subscribe(Subscriber.create().onNext(printValue).onComplete(printDone)); // Output: 1, 2, 3, Done ``` They are also accessible via a "pure function" pipe, and can then be combined with custom operators: ```javascript IObservable o := Observable.range(0,20) .let(Skip.create(1)) // Use a single operator .pipe([ // Chain multiple operators Take.create(3), Map.create(Lambda.function1("x => x * 10")), MyCustomOperator.create(123.4) ]); ISubscription s := o.subscribe(Subscriber.create().onNext(printValue).onComplete(printDone)); // Output: 1, 2, 3, Done ``` Which you choose to use is up to you. There are far too many operators to go through every one here, but there is a really handy decision tree in the [external links section](#other) to help you find the one you need. The [APIDoc](/docs/README.md#api-documentation) lists all of the operators. ## Joining and Combining **Merge** - Combine all values from multiple observables onto one observable ```javascript IObservable o1 := Observable.interval(1.0); IObservable o2 := Observable.interval(1.5); IObservable combined := Observable.merge([o1, o2]); // Output: 0, 0, 1, 2, 1, 3, 2, 4... ``` **Concat** - Append the values from one observable to the values from another ```javascript IObservable o1 := Observable.interval(1.0).take(3); IObservable o2 := Observable.interval(1.5); IObservable combined := Observable.concat([o1, o2]); // Output: 0, 1, 2, 0, 1, 2... ``` **CombineLatest** - Every time an event is received combine it with the latest from every other observable ```javascript IObservable o1 := Observable.interval(1.0); IObservable o2 := Observable.interval(1.5); IObservable combined := Observable.combineLatest([o1, o2], toIntSequence); // Output: [0,0], [1,0], [2,0], [2,1]... // @Time: 1.0 , 2.0 , 3.0 , 3.0 ``` **WithLatestFrom** - Every time an event is receive on the main observable combine it with the latest from every other observable ```javascript IObservable o1 := Observable.interval(1.0); IObservable o2 := Observable.interval(1.5); IObservable combined := o1.withLatestFrom([o2], toIntSequence); // Output: [1,0], [2,1], [2,1]... // @Time: 2.0 , 3.0 , 4.0 ``` **Zip** - Sequentially combine each event with an event from every other observable ```javascript IObservable o1 := Observable.interval(1.0); IObservable o2 := Observable.interval(1.5); IObservable combined := Observable.zip([o1, o2], toIntSequence); // Output: [0,0], [1,1], [2,2], [3,3]... // @Time: 1.5 , 3.0 , 4.5 , 6.0 ``` ## Error Handling Observables handle errors in almost exactly the same they handle "complete" events. They are passed along the chain of observers until they are handled. If they are not handled by the final subscriber then the subscriber will throw an exception. ### Subscriber.onError Subscribing to the error output of an observable allows a subscriber to manually handle errors ```javascript ISubscription s := o.subscribe(Subscriber.create().onError(printError)); action printError(any e) { log e.valueToString() at ERROR; } ``` ### IObservable.catchError Catching errors allows an observable chain to substitute an alternative observable source in the event of an error. ```javascript IObservable o; // = 0, 1, Error ISubscription s := o .catchError(Observable.just("Use this instead")) .subscribe(Subscriber.create().onNext(printValue)); // Output: 0, 1, Use this instead ``` ### IObservable.retry Retry instructs the observable to reconnect to the source in the event of an error. ```javascript IObservable hotObservable; // = 0, 1, Error, 2, 3 ISubscription s := hotObservable .retry(1) .subscribe(Subscriber.create() .onNext(printValue) .onError(printError) .onComplete(printComplete)); // Output: 0, 1, 2, 3 ``` ## Interoperability ### Apama Streams **Receiving from a stream** ```javascript // Receiving events IObservable o := Observable.fromStream(from e in all MyEventType() select e); // Receiving values using com.industry.rx_epl.WrappedAny; IObservable o := Observable.fromStream(from e in all WrappedAny() select e.value); ``` **Output to a stream** ```javascript IObservable o := Observable.interval(1.0); DisposableStream strm := o.toStream(); from value in strm.getStream() select value { log value.valueToString() at INFO; } // When done, the stream should be disposed strm.dispose(); ``` ### Channels **Receiving from a channel** ```javascript // Receiving events IObservable o := Observable.fromChannel("myChannel"); send MyEvent("abc", 123) to "myChannel"; // Values sent in a WrappedAny are automatically unwrapped using com.industry.rx_epl.WrappedAny; send WrappedAny("abc") to "myChannel"; ``` **Output to a channel** ```javascript IObservable o := Observable.interval(1.0); IDisposable d := o.toChannel("myChannel"); ``` ## Multithreading Multithreading allows complex or slow processing to be handled asynchronously on a different context. ### observeOn Observe on is useful when you want the subscription and some of the processing to be done on a different thread. The connection to the original observable is still done on the main context and all data is forwarded to the other context. ```javascript // Ideally should dispose of this when the spawned context is done processing (if ever) IDisposable d := Observable.interval(1.0).observeOnNew(doSomething); action doSomething(IObservable source) { // This part will run on a different context ISubscription s := source.take(4) .subscribe(Subscriber.create().onNext(printValue)); } // Output from "A specific context": 0, 1, 2, 3 ``` ### observeToChannel, observeFromChannel ObserveToChannel and ObserveFromChannel are useful for sending data between different monitor instances which may or may not be on different contexts. ```javascript // Ideally should dispose of this when all subscribers are finished (if ever) IDisposable d := Observable.interval(1.0).observeToChannel("channelName"); // This could be in a different monitor ISubscription s := Observable.observeFromChannel("channelName") .subscribe(Subscriber.create().onNext(printValue)); // Output: 0, 1, 2, 3 ``` ### pipeOn PipeOn allows part of the processing in a chain to be done on a different context. Data is received on the main context, forwarded to another context for processing, and finally forwarded back to the main context for subscription. ```javascript // Values start on the main context ISubscription s := Observable.interval(1.0) // Send to a different context to do the heavy lifting .pipeOnNew([Map.create(multiplyBy10)]); // Back on the main context for the output .subscribe(Subscriber.create().onNext(printValue)); // Output: 0, 10, 20, 30... ``` ### complexPipeOn Much the same as PipeOn, ComplexPipeOn allows part of an observable chain to be processed on a separate context. ```javascript // Values start on the main context ISubscription s := Observable.interval(1.0) // Send to a different context to do the heavy lifting .complexPipeOnNew(doSomething); // Back on the main context for the output .subscribe(Subscriber.create().onNext(printValue)); action doSomething(IObservable o) returns IObservable { return o.map(multiplyBy10) } // Output: 0, 10, 20, 30... ``` ### subscribeOn SubscribeOn will move an entire chain (from source to subscription) onto another context. There are a couple of [Gotchas](#gotchas) with this when used with publishing and sharing. Generally it is recommend to manually spawn a new context manually rather than rely on this. ```javascript ISubscription s := Observable.interval(1.0) .map(multiplyBy10) // Move all processing to a different context // (including the .map and the observable source) .subscribeOnNew(Subscriber.create() .onNext(printValue)); // Output from "A specific context": 0, 10, 20, 30... ``` ## Publishing and Sharing Publishing and Sharing are methods of converting an Observable a multicast emitter. By default when a normal observable is subscribed to an entirely new processing chain is created (one for each subscriber), this can result in some unexpected behaviour: ```javascript IObservable o := Observable.interval(1.0); ISubscription s := o.subscribe(Subscriber.create().onNext(printValue)); // Output: 0, 1, 2, 3... on wait(2.0) { ISubscription s2 := o.subscribe(Subscriber.create().onNext(printValue)); // Output: 0, 1, 2, 3... What?! } ``` The first subscriber creates it's own interval, and so does the second one. These are not linked. ### Publish Publish allows a single observable subscription to be shared among various other subscribers. The upstream subscription is only created when connect is called so downstreams will not receive values until then. ```javascript IObservable o := Observable.interval(1.0).publish(); ISubscription s := o.subscribe(Subscriber.create().onNext(printValue)); IDisposable d := o.connect(); // Output: 0, 1, 2, 3... on wait(2.0) { ISubscription s2 := o.subscribe(Subscriber.create().onNext(printValue)); // Output: 2, 3... Connected 2s after .connect() was called so missed 2 values } ``` ```javascript IObservable o := Observable.interval(1.0).publish(); ISubscription s := o.subscribe(Subscriber.create().onNext(printValue)); // Output: 0, 1, 2, 3... on wait(2.0) { ISubscription s2 := o.subscribe(Subscriber.create().onNext(printValue)); // Output: 0, 1, 2, 3... IDisposable d := o.connect(); } ``` ### Publish RefCount Manually connecting to a published observable can be a pain. RefCount will automatically connect to a published observable when the first subscriber subscribes. It keeps a count of the number of subscribers so that when the count drops to zero it can disconnect. A subscription after disconnection will result in a re-connection. publish().refCount() is so common that it has a shorthand [.share()](#share) ```javascript IObservable o := Observable.interval(1.0).publish().refCount(); // can be replaced with .share() ISubscription s := o.subscribe(Subscriber.create().onNext(printValue)); // Output: 0, 1, 2, 3... on wait(2.0) { ISubscription s2 := o.subscribe(Subscriber.create().onNext(printValue)); // Output: 2, 3... } ``` ### Share Share is a shorthand for [.publish().refCount()](#refcount). ```javascript IObservable o := Observable.interval(1.0).share(); ISubscription s := o.subscribe(Subscriber.create().onNext(printValue)); // Output: 0, 1, 2, 3... on wait(2.0) { ISubscription s2 := o.subscribe(Subscriber.create().onNext(printValue)); // Output: 2, 3... } ``` ## Subject Subjects are just like channels in EPL in that they are a multicast way to send messages to all subscribers. Usually any late subscribers miss the messages. There are a few different types of subject that have various different behaviours. ### Types **Subject** - A normal subject, sends messages to all subscribers **BehaviourSubject** - A subject that always has a current value and sends this to new subscribers. **ReplaySubject** - Replays a certain number of missed values to new subscribers. ### Constructing a Subject Creating a Subject is an alternative way to start an Observable chain (See also: [Observable](#observable)) ```javascript ISubject s := Subject.create(); s := BehaviourSubject.create("InitialValue"); s := ReplaySubject.create(3); ``` ### Sending Data **next(any value)** - Send the next value to be processed ```javascript ISubject subject := Subject.create(); ISubscription s := s.subscribe(Subscriber.create().onNext(printValue)); subject.next(1); // Output: 1 ``` **complete()** - Close the pipe ```javascript ISubject subject := Subject.create(); ISubscription s := s.subscribe(Subscriber.create().onNext(printValue).onComplete(printDone)); subject.next(1); subject.complete(); // Output: 1, Done ``` **error(any error)** - Send an error and close the pipe ```javascript ISubject subject := Subject.create(); ISubscription s := s.subscribe(Subscriber.create().onError(printError)); subject.error("An error occurred"); // Output: An error occurred ``` ## Debugging ### do Do is a handy operator that allows you to inspect the values as they pass through a chain of observables. It is like a subscriber except that it is passive, only receiving messages when someone is subscribed further down the chain. ```javascript IObservable o := Observable.interval(1.0); ISubscription s := o.take(5); .do(LoggingSubscriber.create("BeforeMultiplication")); .map(multiplyBy10) .subscribe(LoggingSubscriber.create("Result")); // Output from Do: 0, 1, 2, 3, 4, Completed, Unsubscribed // Output from Subscribe: 0, 10, 20, 30, 40, Completed, Unsubscribed ``` ## Reusability: Build your own Observable Operator Lets say you have the following: ```javascript observable .pluck("temperature") .filter(aboveThreshold); ``` but you want it to be reusable. There are several options: ### Option 1: Wrap into an action - Good ```javascript action tempTooHigh(IObservable source) returns IObservable { return observable .pluck("temperature") .filter(aboveThreshold); } ``` You might be tempted just to call that action eg. `tempTooHigh(myObservableTemps)`, but **don't**, there's a better way: ```javascript IObservable o := myObservableTemps .complexPipe(tempTooHigh); ``` Why is this better? It is much more obvious what order the chain is processing in. ### Option 2: Convert to pipe - Better ```javascript sequence returns ISubscription> > returns action returns ISubscription> tempTooHigh := [Pluck.create("temperature"), Filter.create(outsideThreshold)]; IObservable o := myObservableTemps .pipe(tempTooHigh); ``` ### Option 3: Create a custom pipe - Best ```javascript event TempTooHigh { static action create() returns action returns ISubscription> returns action returns ISubscription { return Pipe.create([ Pluck.create("temperature"), Filter.create(outsideThreshold) ]); } } IObservable o := myObservableTemps .pipe([TempTooHigh.create()]); // or IObservable o := myObservableTemps .let(TempTooHigh.create()); ``` ## Gotchas ### Multiple Subscribers ```javascript IObservable sharedObs := Observable.interval(1.0); ISubscription s1 := sharedObs.subscribe(Subscriber.create().onNext(printValue)); // Output: 0,1,2,3... on wait(2.0) { ISubscription s2 := sharedObs.subscribe(Subscriber.create().onNext(printValue)); // Output: 0,1,2,3... What? } ``` **Why?** Each subscriber creates its own chain through to the source. In this case that means that each subscriber get it's own interval. **Solution** Use [Share](#multicasting) ```javascript IObservable sharedObs := Observable.interval(1.0).share(); ISubscription s1 := sharedObs.subscribe(Subscriber.create().onNext(printValue)); // Output: 0,1,2,3... on wait(2.0) { ISubscription s2 := sharedObs.subscribe(Subscriber.create().onNext(printValue)); // Output: 2,3... } ``` ### Publish/Share and SubscribeOn ```javascript IObservable sharedObs := Observable.interval(1.0).share(); ISubscription s1 := sharedObs.subscribe(Subscriber.create().onNext(printValue)); ISubscription s2 := sharedObs.subscribeOnNew(Subscriber.create().onNext(printValue)); // Output on "Main Context": 0,1,2,3... // Output on "New Context": Nothing.... What?! ``` **Why?** Publish and Share both store information about their current subscribers and upstream connections. When the entire chain is moved onto another context this information is copied and the operators no longer function correctly. ***Solution*** - `.decouple()` ```javascript IObservable sharedObs := Observable.interval(1.0) .share(); ISubscription s1 := sharedObs.subscribe(Subscriber.create().onNext(printValue)); ISubscription s2 := sharedObs.decouple() // Note: observeOn may be a better solution .subscribeOnNew(Subscriber.create().onNext(printValue)); // Output on "Main Context": 0,1,2,3... // Output on "New Context": 0,1,2,3... ``` This helps by 'decoupling' the upstream and the downstream. When subscribeOn is called only the downstream is copied. This means that only part of the chain is running on a separate context. A better solution might be to use ObserveOn instead of subscribeOn. ## EPL Lambdas Although this library can be used as a standalone library, we highly recommend that it is used in conjunction with the [Lambdas for Apama EPL](https://github.com/SoftwareAG/apama-lambdas) library. It allows you to turn this: ```javascript IObservable o := Observable.fromValues([1,2,3,4]) .map(multiplyBy10) .reduce(sum); action multiplyBy10(any value) returns any { // Will only work on integers return value * 10; } action sum(any currentSum, any value) returns any { // Will only work on integers return currentSum + value; } ``` Into this: ```javascript IObservable o := Observable.fromValues([1,2,3,4]) .map(Lambda.function1("x => x * 10")) // Will work on any numeric type .reduce(Lambda.function2("sum, x => sum + x")); // Will work on any numeric type ``` There are many added benefits to using Lambdas, one of the main advantages being automatic type coercion. You'll never have to cast a numeric value - they're automatically converted to an appropriate type. ## Help and Other Resources **[ReactiveX Website](http://reactivex.io/)** - A great place to get info about the background to the framework.\ **[Decision Tree Of Observables](http://reactivex.io/documentation/operators.html#tree)** - Don't know which operator to use? Follow this\ **[RxMarbles](http://rxmarbles.com/)** - An interactive tool to play with observable operators ------------------------------ These tools are provided as-is and without warranty or support. They do not constitute part of the Software AG product suite. Users are free to use, fork and modify them, subject to the license agreement. While Software AG welcomes contributions, we cannot guarantee to include every contribution in the master project.