zk-and-rx"
Robertwenzel (talk | contribs) m (→Architecture) |
m (correct highlight (via JWB)) |
||
(100 intermediate revisions by 2 users not shown) | |||
Line 4: | Line 4: | ||
|version=ZK 8.5 | |version=ZK 8.5 | ||
}} | }} | ||
+ | |||
+ | =Potential Titles= | ||
+ | |||
+ | '''Deal with hot RX observables in a UI''' | ||
+ | |||
+ | '''Tame talkative Event Sources''' | ||
+ | |||
+ | '''Backend! Shut up! - dealing with real-time event streams''' | ||
= Introduction = | = Introduction = | ||
+ | [[File:zkrx-banner.png|910px]] | ||
+ | |||
+ | '''TL;DR''': This article is about integrating an RxJava Observable with a ZK application. The focus lies on limiting the UI update frequency while observing a hot event stream. Composite operators help keeping the code clean an reusable. | ||
+ | |||
+ | '''RxJava''' = [https://github.com/ReactiveX/RxJava RxJava] is a Java VM implementation of [http://reactivex.io/ Reactive Extensions]: a library for composing asynchronous and event-based programs by using observable sequences. It is popular as it addresses the concerns about low-level threading, synchronization, thread-safety and concurrent data structures. | ||
− | + | '''ZK Framework''' = [https://zkoss.org ZK Framework] is an open source Java Web framework. It is a server-centric user interface solution that enables developers to create highly interactive enterprise applications. I am using this UI framework here because it's MVVM pattern and Websocket support fit well into RxJava's reactive streams. | |
− | + | = The Mission = | |
− | + | [[File:zkrx-moody-robots.png|right|frame|Moody Robots]] | |
+ | Assume you are working for a project on a Robot Farm (I think there could be worse projects). Unfortunately those Robots are a bit moody and unreliable sometimes - so of course there needs to be a supervisor sitting in his/her office chair and watching a screen to monitor all the Robot movements. Since human supervisors are prone to errors too, there should be multiple supervisors observing the same Robots simultaneously. | ||
− | + | Based on their current assignment and in order to preserve bandwidths supervisors must be enabled to track certain Robots near real-time (filtered by mood or position) without completely losing track of the overall situation, | |
+ | i.e. The Robots we are interested in have a faster update rate than others (e.g. 100 ms vs. 1 sec). | ||
However the Robots are constantly sending data at high frequency to your back-end process. | However the Robots are constantly sending data at high frequency to your back-end process. | ||
Your challenge is to connect the UI to the stream of information reducing it based on the filter criteria and throttle the sheer amount of data to something the human eye and your network connection can handle. | Your challenge is to connect the UI to the stream of information reducing it based on the filter criteria and throttle the sheer amount of data to something the human eye and your network connection can handle. | ||
− | = | + | = The Outset= |
− | For the Robot event stream | + | For the Robot event stream we choose RXJava because it implements the [http://reactivex.io/documentation/observable.html flexible Observable API] - ideal when dealing with asynchronous event streams in combination with a [http://reactivex.io/documentation/operators.html#alphabetical powerful set of operators] to transform the results and for the Web UI we go with the open source ZK Framework. |
+ | RxJava's reactive streams fit well into [http://books.zkoss.org/zk-mvvm-book/8.0/introduction_of_mvvm.html ZK's MVVM design pattern], making an interesting combination worth talking about. | ||
Here the technologies used: | Here the technologies used: | ||
Line 26: | Line 42: | ||
== The Backend (RX Observable) == | == The Backend (RX Observable) == | ||
− | The stream of | + | The stream of [https://github.com/zkoss-demo/zk-rxdemo/blob/master/src/main/java/zk/rx/demo/service/TrackEvent.java TrackEvent<Robot>]-objects is produced by a single hot Observable initialized at startup. |
− | *constantly | + | The Observable ... |
− | + | *constantly emits TrackEvents at 10ms intervals | |
− | *allows multiple consumers (using observable.publish() / observable.connect() | + | : (unreliability simulated by randomly updating ~30% of the robots with an additional 2% chance to change the mood) |
+ | *allows multiple consumers (using [https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators observable.publish() / observable.connect()]) | ||
− | + | As this Observable is unaware of the front end it will just keep emitting the events once started - no matter what. | |
− | For each new subscriber it will initially | + | For each new subscriber it will initially send TrackEvents for all Robots followed by the random stream of events every subscriber shares. |
+ | |||
+ | [https://github.com/zkoss-demo/zk-rxdemo/blob/master/src/main/java/zk/rx/demo/service/RobotBackend.java zk.rx.demo.service.RobotBackend] | ||
+ | <source lang="java" highlight="14,15,26"> | ||
+ | public class RobotBackend { | ||
+ | private static int NUM_ROBOTS = 20; | ||
+ | ... | ||
+ | |||
+ | /** | ||
+ | * called once to start the stream of events | ||
+ | */ | ||
+ | public void start() { | ||
+ | allRobots = LongStream.range(0L, NUM_ROBOTS) | ||
+ | .mapToObj(index -> new Robot(100 + index, new Position(0, 0), Robot.Mood.NEUTRAL)) | ||
+ | .collect(Collectors.toConcurrentMap(Robot::getId, robot -> robot)); | ||
+ | |||
+ | Observable<TrackEvent<Robot>> obs = Observable.create(this::backgroundThread); | ||
+ | hotRobotObservable = obs.publish(); | ||
+ | disposable = hotRobotObservable.connect(); | ||
+ | } | ||
+ | |||
+ | /** | ||
+ | * called by each subscriber to connect to the same event stream | ||
+ | * @return Observable of {@link TrackEvent} | ||
+ | */ | ||
+ | public Observable<TrackEvent<Robot>> trackRobots() { | ||
+ | Stream<TrackEvent<Robot>> currentRobots = allRobots.values().stream() | ||
+ | .map(robot -> new TrackEvent<>(TrackEvent.Name.ON_ENTER, robot, robot)); | ||
+ | //prepend initial state for all robots to the hot stream of updates | ||
+ | return hotRobotObservable | ||
+ | .startWith(currentRobots::iterator); | ||
+ | } | ||
+ | |||
+ | ... some logic to create the thread and random positions below ... | ||
+ | </source> | ||
+ | |||
+ | Just imagine the GPS beacons in our Robots constantly keep sending positional information, which we don't intent to control from the front end side. We simply need to deal with any given update frequency. | ||
== The UI (ZK MVVM application) == | == The UI (ZK MVVM application) == | ||
+ | A simple UI is implemented in ZK using a zul template and a java ViewModel class. | ||
+ | UI specific calculated properties such as styleClasses (derived from mood and realTime status) are added by wrapping the domain class Robot into a UiRobot. | ||
+ | |||
+ | [https://github.com/zkoss-demo/zk-rxdemo/blob/master/src/main/webapp/index.zul index.zul] | ||
+ | <source lang="xml" highlight="12,13,14,15"> | ||
+ | <?style src="style.css"?> | ||
+ | <zk xmlns:w="client"> | ||
+ | <div id="robotFarm" viewModel="@id('vm') @init('zk.rx.demo.vm.RobotFarmViewModel')"> | ||
+ | ... | ||
+ | |||
+ | <div sclass="trackingArea"> | ||
+ | <if test="@load(vm.centerRegionTracking)"> | ||
+ | <div sclass="centerRegionArea"/> | ||
+ | </if> | ||
+ | <forEach items="@init(vm.trackedRobots)" var="mapEntry"> | ||
+ | <apply uiRobot="@init(mapEntry.value)"> | ||
+ | <div sclass="@load(uiRobot.styleClasses)" | ||
+ | left="@load((uiRobot.robot.position.x += '%'))" | ||
+ | top="@load((uiRobot.robot.position.y += '%'))"> | ||
+ | </div> | ||
+ | </apply> | ||
+ | </forEach> | ||
+ | </div> | ||
+ | |||
+ | <div sclass="controlArea" align="center"> | ||
+ | Real-time: | ||
+ | <combobox readonly="true" model="@init(vm.filterNamesModel)" onSelect="@command('selectFilter')" width="120px"/> | ||
+ | <button iconSclass="@load(vm.running ? 'z-icon-stop' : 'z-icon-play')" label="@load(vm.running ? 'Stop' : 'Start')" | ||
+ | onClick="@command('toggleRunning')"/> | ||
+ | <button iconSclass="z-icon-retweet" label="Ping Server" onClick="@command('testServerResponse')"/> | ||
+ | </div> | ||
+ | </div> | ||
+ | </zk> | ||
+ | </source> | ||
+ | |||
+ | '''Lines 12-15:''' render Robots as divs with dynamic styles and position reacting to model changes | ||
+ | |||
+ | I'll not go too deep into ZK specifics now. Updating the UI (i.e. responding to data changes in the View Model) can be triggered several ways: | ||
+ | A simple one is annotating a command handler method with @NotifyChange ... | ||
+ | |||
+ | <source lang="java"> | ||
+ | @Command | ||
+ | @NotifyChange("centerRegionTracking") | ||
+ | public void selectFilter() { | ||
+ | currentFilter = availableFilters.get(filterNamesModel.getSelection().iterator().next()); | ||
+ | if(isRunning()) { | ||
+ | start(); | ||
+ | } | ||
+ | } | ||
+ | </source> | ||
+ | |||
+ | ... which will then update the corresponding data binding (<code>@load(vm.centerRegionTracking)</code>) in the zul file: | ||
+ | |||
+ | <source lang="xml"> | ||
+ | <if test="@load(vm.centerRegionTracking)"> | ||
+ | <div sclass="centerRegionArea"/> | ||
+ | </if> | ||
+ | </source> | ||
+ | |||
+ | |||
+ | An alternative is to call <javadoc class="true" method="postNotifyChange(java.lang.String, java.lang.String, java.lang.Object, java.lang.String)">org.zkoss.bind.BindUtils</javadoc> directly in order to trigger a <code>@load</code> binding (e.g. [https://github.com/zkoss-demo/zk-rxdemo/blob/master/src/main/java/zk/rx/demo/vm/RobotFarmViewModel.java#L145 called from the ViewModel] when a robot is updated). | ||
+ | |||
+ | For the interested here the complete [http://books.zkoss.org/zk-mvvm-book/8.0/viewmodel/notification.html MVVM documentation on notifications]. | ||
+ | |||
+ | = The Magic in the Middle = | ||
+ | |||
+ | Knowing '''how''' to update the UI is mostly a technicality. | ||
+ | The trickier decisions are: '''when''' and '''what''' to render in the UI, because these will have direct impact on the performance and responsiveness of your application. | ||
− | + | Also since the server and client side are connected via network there's a latency which needs to be considered. In this example we use a WebSocket to minimize that overhead. By enabling [https://github.com/zkoss-demo/zk-rxdemo/blob/master/src/main/webapp/WEB-INF/zk.xml#L5 this feature] the framework will handle this transparently and we don't need to worry about it anymore. | |
− | == The | + | == The How - Updating the UI == |
− | + | As the observable emits <code>TrackEvent<Robot></code> objects the basic way to process those might look like this: | |
− | * | + | |
− | ** buffer | + | <source lang="java"> |
− | + | backend.trackRobots() | |
− | + | .subscribe(event -> updateUi(event), this::handleError); | |
+ | </source> | ||
+ | |||
+ | However this would not work just that: | ||
+ | A technical requirement is to obtain a "lock" before UI elements in page (called "Desktop" in ZK) can be updated e.g. via change notification - mentioned above. For user triggered events such as mouse or keyboard events this happens automatically - background threads have to obtain a lock on demand. Especially if only parts of the background thread need to update the UI, the remaining code can run in parallel without blocking user interactions. | ||
+ | Getting the "lock" (also called activating the desktop) looks as simple as that (like a transaction): | ||
+ | |||
+ | <source lang="java"> | ||
+ | try { | ||
+ | Executions.activate(desktop); //obtains the lock | ||
+ | //do any UI updates between activate/deactivate <--------------------------- | ||
+ | } finally { | ||
+ | Executions.deactivate(desktop); //will release the lock and flush the changes to the UI and out to the browser | ||
+ | } | ||
+ | </source> | ||
+ | |||
+ | However this looks like tedious boilerplate code and forgetting to "deactivate" may lead to infinite dead locks for that particular "desktop". | ||
+ | Better we wrap that in ''some way'' so it can be reused safely and easily integrated into the observable chain. | ||
+ | |||
+ | Obviously activate/deactivate don't affect the data of the stream so that the RX side effect operators ([http://reactivex.io/documentation/operators/do.html doOn...]) sound like a good match: | ||
+ | |||
+ | <source lang="java"> | ||
+ | backend.trackRobots() | ||
+ | .observeOn(Schedulers.io()) | ||
+ | .doOnNext(event -> Executions.activate(desktop)) //potentially blocking that's why Schedulers.io() | ||
+ | .doAfterNext(event -> Executions.deactivate(desktop)) | ||
+ | .doOnTerminate(() -> Executions.deactivate(desktop)); | ||
+ | .subscribe(event -> trackRobot(event), this::handleError); | ||
+ | </source> | ||
+ | (also covered in [https://dzone.com/articles/rxjavas-side-effect-methods "RxJava's Side Effect Methods"]) | ||
+ | |||
+ | Again: adding those 4 lines before the update might be tedious and still error prone (e.g. using the wrong Scheduler might lead to dead lock as well as forgetting any of the lines). To make this more manageable we can reuse those 4 lines by implementing a single [https://github.com/zkoss-demo/zk-rxdemo/blob/master/src/main/java/zk/rx/operators/ZkObservable.java#L22 activated()] operator extending from [http://reactivex.io/RxJava/javadoc/io/reactivex/ObservableTransformer.html ObservableTransformer<T, T>]. | ||
+ | |||
+ | <source lang="java"> | ||
+ | public class ZkObservable { | ||
+ | public static <T> ObservableTransformer<T, T> activated() { | ||
+ | return activated(Executions.getCurrent().getDesktop()); | ||
+ | } | ||
+ | |||
+ | public static <T> ObservableTransformer<T, T> activated(Desktop desktop) { | ||
+ | ZkDesktopOps desktopOps = new ZkDesktopOps(desktop); | ||
+ | //use IO Scheduler - potentially blocking operation to wait for desktop activation | ||
+ | return upstream -> upstream | ||
+ | .observeOn(Schedulers.io()) | ||
+ | .doOnNext(toConsumer(desktopOps.activate())) //potentially blocking | ||
+ | .doAfterNext(toConsumer(desktopOps.deactivate())) | ||
+ | .doOnTerminate(desktopOps.deactivate()); | ||
+ | } | ||
+ | </source> | ||
+ | |||
+ | Adding this into our chain via [http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#compose-io.reactivex.ObservableTransformer- Observable.compose()] ... | ||
+ | |||
+ | <source lang="java"> | ||
+ | backend.trackRobots() | ||
+ | .compose(ZkObservable.activated()) //means perform the downstream with an activated desktop (and clean up after it) | ||
+ | .subscribe(event -> trackRobot(event), this::handleError); | ||
+ | </source> | ||
+ | |||
+ | ... the code looks much better ... and as a bonus, the same technique can be used for other side effects such as logging, setting thread locals, open close other resources, transaction bracketing etc. - basically anything that doesn't affect the data in the stream. | ||
+ | |||
+ | == The When - Throttling/Batching == | ||
+ | |||
+ | Being ''just'' a side effect doesn't mean it's a ''cheap'' side effect. Opening closing a transaction or acquiring a lock is often expensive and may require waiting (blocking). In our case it even triggers network communication before the next thread can activate the Desktop again. | ||
+ | This penalty is multiplied by the high update rate from the back-end: ~600 events / sec (20 Robots * 100 updates/second * 30%). | ||
+ | |||
+ | As network round trips tend to take longer than 1.6ms (1000ms/600) we need to reduce the update frequency by processing multiple updates within the same activation/deactivation window. Here the RX [http://reactivex.io/documentation/operators/buffer.html Buffer operator] does the trick. | ||
+ | |||
+ | <source lang="java"> | ||
+ | backend.trackRobots() | ||
+ | .buffer(100, TimeUnit.MILLISECONDS)) | ||
+ | .compose(ZkObservable.activated()) //only activate once all events collected within 100ms | ||
+ | .subscribe(/*now we'd have a List<TrackEvent<Robot>> here*/) | ||
+ | </source> | ||
+ | |||
+ | Now things are already a lot better: Collecting events for 100ms and updating all affected robots at once will save a majority of desktop activations. | ||
+ | 10/sec vs ~600/sec before. | ||
+ | |||
+ | Given the chance of an empty buffer, it makes sense to avoid the desktop activation sometimes. | ||
+ | |||
+ | Finally it would feel more natural to deal with single events (instead of lists) at the end of our stream which can be done with the [http://reactivex.io/documentation/operators/flatmap.html flatMap/concatMap] operator family. | ||
+ | |||
+ | So a better version would look like this ... | ||
+ | |||
+ | <source lang="java"> | ||
+ | backend.trackRobots() | ||
+ | .buffer(100, TimeUnit.MILLISECONDS)) | ||
+ | .filter(items -> !items.isEmpty()) //avoid activation when buffer is empty | ||
+ | .compose(ZkObservable.activated()) | ||
+ | .concatMapIterable(items -> items); //concatMap... preserves the original emission order (flatMap... does not) | ||
+ | .subscribe(event -> trackRobot(event), this::handleError); | ||
+ | </source> | ||
+ | |||
+ | ... which can again be composed into the [https://github.com/zkoss-demo/zk-rxdemo/blob/master/src/main/java/zk/rx/operators/ZkObservable.java#L32 activatedThrottle operator] (combining the buffering, activation and buffer-separation) resulting in this ... | ||
+ | |||
+ | <source lang="java"> | ||
+ | backend.trackRobots() | ||
+ | .compose(ZkObservable.activatedThrottle(100)) | ||
+ | .subscribe(event -> trackRobot(event), this::handleError); | ||
+ | </source> | ||
+ | |||
+ | Now we start talking ... but wait ... there's still more eliminate. | ||
+ | |||
+ | === Optimizing the Buffer Size === | ||
+ | |||
+ | Batching an average of 60 updates for 20 Robots must inevitably contain duplicates. It would be a waste to update the same Robot several times within a batch causing redundant render instructions for the client engine increasing the network traffic and the client side render time. | ||
+ | We are only interested in the latest updates, so we better eliminate duplicates in each batch before rendering. Ideally this should happen while buffering the events. | ||
+ | |||
+ | So far I haven't found a default operator that does what I needed (e.g. [http://reactivex.io/documentation/operators/distinct.html distinct] does quite the opposite emitting only the first unique event - whereas I'd like to keep the last of a kind - which of course doesn't make sense in an infinite stream) so I came up with the following solution using another flavor of the [http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#buffer-java.util.concurrent.Callable-java.util.concurrent.Callable- buffer operator accepting a custom buffer supplier] (better ideas are welcome !!). | ||
+ | |||
+ | The buffer supplier provides a [https://github.com/zkoss-demo/zk-rxdemo/blob/master/src/main/java/zk/rx/operators/ZkObservable.java#L58-L60 KeyedSet] that identifies events for the same Robot by keySelector and preserves the order based on last access. | ||
+ | Adding this to a (let's call it) [https://github.com/zkoss-demo/zk-rxdemo/blob/master/src/main/java/zk/rx/operators/ZkObservable.java#L40 activatedThrottleUnique operator] we get this: | ||
+ | |||
+ | <source lang="java"> | ||
+ | backend.trackRobots() | ||
+ | //use the robot.id as unique key | ||
+ | .compose(ZkObservable.activatedThrottleUnique(100, event -> event.getCurrent().getId())) | ||
+ | .subscribe(event -> trackRobot(event), this::handleError); | ||
+ | </source> | ||
+ | |||
+ | Finally the maximum batch size is limited to 20 (1 update per Robot no duplicates). Considering where we came from this dramatically reduces the overall workload, network traffic and locking overhead. Even if the back-end would suddenly produce more events this number of 20 items per batch would not increase - keeping the UI update effort stable. | ||
+ | |||
+ | Now let's do something for the user. | ||
+ | |||
+ | == The What - Filtering == | ||
+ | |||
+ | Still there's only so much a connection and the human eye can handle, which makes it reasonable to think about filtering the event stream and only display what the supervisor is interested in: i.e. highlight only happy/angry robots or follow robots which cross the center region. | ||
+ | |||
+ | [[File:zkrx_filtering.png]] | ||
+ | |||
+ | We decide to render the highlighted robots near real-time (updated every 100ms) and render the remaining robots slightly transparent, a bit smaller and update them only once per second. The highlighting is done by css classes which are added/removed dynamically. | ||
+ | |||
+ | The different update speeds can be achieved by subscribing to the stream twice with different throttle intervals and filters. A CompositeDisposable simplifies subscription cancellation. | ||
+ | |||
+ | Here the resulting code as in the example [https://github.com/zkoss-demo/zk-rxdemo/blob/master/src/main/java/zk/rx/demo/vm/RobotFarmViewModel.java#L90 RobotFarmViewModel]: | ||
+ | |||
+ | <source lang="java" highlight="3,5,7,9"> | ||
+ | subscriptions = new CompositeDisposable(); | ||
+ | |||
+ | Disposable realtimeSubscription = robotTracker.trackRobots(this.currentFilter) | ||
+ | .compose(ZkObservable.activatedThrottleUnique(100, event -> event.getCurrent().getId())) | ||
+ | .subscribe(this::trackRealtimeRobot, this::handleError); | ||
+ | |||
+ | Disposable delayedSubscription = robotTracker.trackRobots(negate(this.currentFilter)) | ||
+ | .compose(ZkObservable.activatedThrottleUnique(1000, event -> event.getCurrent().getId())) | ||
+ | .subscribe(this::trackDelayedRobot, this::handleError); | ||
+ | |||
+ | subscriptions.addAll(realtimeSubscription, delayedSubscription); | ||
+ | </source> | ||
+ | |||
+ | '''Lines 3, 7''': the RobotTracker class translates the TrackEvents based on a filter | ||
+ | |||
+ | '''Lines 5, 9''': the logic to update the UI is slightly different between the real time and the delayed Robots | ||
+ | |||
+ | The filtering is handled by the [https://github.com/zkoss-demo/zk-rxdemo/blob/master/src/main/java/zk/rx/demo/service/RobotTracker.java#L22 RobotTracker] class which translates the stream of ON_UPDATE events into ON_UPDATE, ON_ENTER and ON_LEAVE to indicate whether the filter condition (e.g. the center region) was entered or left or remained unchanged. | ||
+ | |||
+ | [[File:zkrx-result.png|center|frame|maybe make this a youtube video??]] | ||
+ | |||
+ | Now only the currently filtered robots are moving near ''real time'' (every 100ms) while the remaining robots only update once every second. | ||
= Summary = | = Summary = | ||
+ | |||
+ | I hope the Robot Farm supervisors will have as much fun using the stream lined UI as I had implementing it. | ||
+ | And of course I hope to contribute some useful ideas and techniques when using reactive streams glued together with a UI framework be it ZK or a different one. | ||
+ | |||
+ | As always some bugs/problems might have slipped into the code - so I am eager to hear your comments and suggestions to improve this experiment. | ||
+ | |||
+ | You might also ask: ''Why Robots'??' | ||
+ | |||
+ | The answer is simple: Most examples on RX I've seen focus on stock price tickers which are quite 1-Dimensional. So I thought it might not hurt add multiple criteria which both change and can be used to filter on. And I just love the colorful result compared to plain numbers updating. | ||
== Example Sources == | == Example Sources == |
Latest revision as of 04:12, 20 January 2022
Robert Wenzel, Engineer, Potix Corporation
September, 2017
ZK 8.5
Potential Titles
Deal with hot RX observables in a UI
Tame talkative Event Sources
Backend! Shut up! - dealing with real-time event streams
Introduction
TL;DR: This article is about integrating an RxJava Observable with a ZK application. The focus lies on limiting the UI update frequency while observing a hot event stream. Composite operators help keeping the code clean an reusable.
RxJava = RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences. It is popular as it addresses the concerns about low-level threading, synchronization, thread-safety and concurrent data structures.
ZK Framework = ZK Framework is an open source Java Web framework. It is a server-centric user interface solution that enables developers to create highly interactive enterprise applications. I am using this UI framework here because it's MVVM pattern and Websocket support fit well into RxJava's reactive streams.
The Mission
Assume you are working for a project on a Robot Farm (I think there could be worse projects). Unfortunately those Robots are a bit moody and unreliable sometimes - so of course there needs to be a supervisor sitting in his/her office chair and watching a screen to monitor all the Robot movements. Since human supervisors are prone to errors too, there should be multiple supervisors observing the same Robots simultaneously.
Based on their current assignment and in order to preserve bandwidths supervisors must be enabled to track certain Robots near real-time (filtered by mood or position) without completely losing track of the overall situation, i.e. The Robots we are interested in have a faster update rate than others (e.g. 100 ms vs. 1 sec).
However the Robots are constantly sending data at high frequency to your back-end process. Your challenge is to connect the UI to the stream of information reducing it based on the filter criteria and throttle the sheer amount of data to something the human eye and your network connection can handle.
The Outset
For the Robot event stream we choose RXJava because it implements the flexible Observable API - ideal when dealing with asynchronous event streams in combination with a powerful set of operators to transform the results and for the Web UI we go with the open source ZK Framework. RxJava's reactive streams fit well into ZK's MVVM design pattern, making an interesting combination worth talking about.
Here the technologies used:
The Backend (RX Observable)
The stream of TrackEvent<Robot>-objects is produced by a single hot Observable initialized at startup.
The Observable ...
- constantly emits TrackEvents at 10ms intervals
- (unreliability simulated by randomly updating ~30% of the robots with an additional 2% chance to change the mood)
- allows multiple consumers (using observable.publish() / observable.connect())
As this Observable is unaware of the front end it will just keep emitting the events once started - no matter what. For each new subscriber it will initially send TrackEvents for all Robots followed by the random stream of events every subscriber shares.
zk.rx.demo.service.RobotBackend
public class RobotBackend {
private static int NUM_ROBOTS = 20;
...
/**
* called once to start the stream of events
*/
public void start() {
allRobots = LongStream.range(0L, NUM_ROBOTS)
.mapToObj(index -> new Robot(100 + index, new Position(0, 0), Robot.Mood.NEUTRAL))
.collect(Collectors.toConcurrentMap(Robot::getId, robot -> robot));
Observable<TrackEvent<Robot>> obs = Observable.create(this::backgroundThread);
hotRobotObservable = obs.publish();
disposable = hotRobotObservable.connect();
}
/**
* called by each subscriber to connect to the same event stream
* @return Observable of {@link TrackEvent}
*/
public Observable<TrackEvent<Robot>> trackRobots() {
Stream<TrackEvent<Robot>> currentRobots = allRobots.values().stream()
.map(robot -> new TrackEvent<>(TrackEvent.Name.ON_ENTER, robot, robot));
//prepend initial state for all robots to the hot stream of updates
return hotRobotObservable
.startWith(currentRobots::iterator);
}
... some logic to create the thread and random positions below ...
Just imagine the GPS beacons in our Robots constantly keep sending positional information, which we don't intent to control from the front end side. We simply need to deal with any given update frequency.
The UI (ZK MVVM application)
A simple UI is implemented in ZK using a zul template and a java ViewModel class. UI specific calculated properties such as styleClasses (derived from mood and realTime status) are added by wrapping the domain class Robot into a UiRobot.
<?style src="style.css"?>
<zk xmlns:w="client">
<div id="robotFarm" viewModel="@id('vm') @init('zk.rx.demo.vm.RobotFarmViewModel')">
...
<div sclass="trackingArea">
<if test="@load(vm.centerRegionTracking)">
<div sclass="centerRegionArea"/>
</if>
<forEach items="@init(vm.trackedRobots)" var="mapEntry">
<apply uiRobot="@init(mapEntry.value)">
<div sclass="@load(uiRobot.styleClasses)"
left="@load((uiRobot.robot.position.x += '%'))"
top="@load((uiRobot.robot.position.y += '%'))">
</div>
</apply>
</forEach>
</div>
<div sclass="controlArea" align="center">
Real-time:
<combobox readonly="true" model="@init(vm.filterNamesModel)" onSelect="@command('selectFilter')" width="120px"/>
<button iconSclass="@load(vm.running ? 'z-icon-stop' : 'z-icon-play')" label="@load(vm.running ? 'Stop' : 'Start')"
onClick="@command('toggleRunning')"/>
<button iconSclass="z-icon-retweet" label="Ping Server" onClick="@command('testServerResponse')"/>
</div>
</div>
</zk>
Lines 12-15: render Robots as divs with dynamic styles and position reacting to model changes
I'll not go too deep into ZK specifics now. Updating the UI (i.e. responding to data changes in the View Model) can be triggered several ways: A simple one is annotating a command handler method with @NotifyChange ...
@Command
@NotifyChange("centerRegionTracking")
public void selectFilter() {
currentFilter = availableFilters.get(filterNamesModel.getSelection().iterator().next());
if(isRunning()) {
start();
}
}
... which will then update the corresponding data binding (@load(vm.centerRegionTracking)
) in the zul file:
<if test="@load(vm.centerRegionTracking)">
<div sclass="centerRegionArea"/>
</if>
An alternative is to call BindUtils.postNotifyChange(String, String, Object, String) directly in order to trigger a @load
binding (e.g. called from the ViewModel when a robot is updated).
For the interested here the complete MVVM documentation on notifications.
The Magic in the Middle
Knowing how to update the UI is mostly a technicality. The trickier decisions are: when and what to render in the UI, because these will have direct impact on the performance and responsiveness of your application.
Also since the server and client side are connected via network there's a latency which needs to be considered. In this example we use a WebSocket to minimize that overhead. By enabling this feature the framework will handle this transparently and we don't need to worry about it anymore.
The How - Updating the UI
As the observable emits TrackEvent<Robot>
objects the basic way to process those might look like this:
backend.trackRobots()
.subscribe(event -> updateUi(event), this::handleError);
However this would not work just that: A technical requirement is to obtain a "lock" before UI elements in page (called "Desktop" in ZK) can be updated e.g. via change notification - mentioned above. For user triggered events such as mouse or keyboard events this happens automatically - background threads have to obtain a lock on demand. Especially if only parts of the background thread need to update the UI, the remaining code can run in parallel without blocking user interactions. Getting the "lock" (also called activating the desktop) looks as simple as that (like a transaction):
try {
Executions.activate(desktop); //obtains the lock
//do any UI updates between activate/deactivate <---------------------------
} finally {
Executions.deactivate(desktop); //will release the lock and flush the changes to the UI and out to the browser
}
However this looks like tedious boilerplate code and forgetting to "deactivate" may lead to infinite dead locks for that particular "desktop". Better we wrap that in some way so it can be reused safely and easily integrated into the observable chain.
Obviously activate/deactivate don't affect the data of the stream so that the RX side effect operators (doOn...) sound like a good match:
backend.trackRobots()
.observeOn(Schedulers.io())
.doOnNext(event -> Executions.activate(desktop)) //potentially blocking that's why Schedulers.io()
.doAfterNext(event -> Executions.deactivate(desktop))
.doOnTerminate(() -> Executions.deactivate(desktop));
.subscribe(event -> trackRobot(event), this::handleError);
(also covered in "RxJava's Side Effect Methods")
Again: adding those 4 lines before the update might be tedious and still error prone (e.g. using the wrong Scheduler might lead to dead lock as well as forgetting any of the lines). To make this more manageable we can reuse those 4 lines by implementing a single activated() operator extending from ObservableTransformer<T, T>.
public class ZkObservable {
public static <T> ObservableTransformer<T, T> activated() {
return activated(Executions.getCurrent().getDesktop());
}
public static <T> ObservableTransformer<T, T> activated(Desktop desktop) {
ZkDesktopOps desktopOps = new ZkDesktopOps(desktop);
//use IO Scheduler - potentially blocking operation to wait for desktop activation
return upstream -> upstream
.observeOn(Schedulers.io())
.doOnNext(toConsumer(desktopOps.activate())) //potentially blocking
.doAfterNext(toConsumer(desktopOps.deactivate()))
.doOnTerminate(desktopOps.deactivate());
}
Adding this into our chain via Observable.compose() ...
backend.trackRobots()
.compose(ZkObservable.activated()) //means perform the downstream with an activated desktop (and clean up after it)
.subscribe(event -> trackRobot(event), this::handleError);
... the code looks much better ... and as a bonus, the same technique can be used for other side effects such as logging, setting thread locals, open close other resources, transaction bracketing etc. - basically anything that doesn't affect the data in the stream.
The When - Throttling/Batching
Being just a side effect doesn't mean it's a cheap side effect. Opening closing a transaction or acquiring a lock is often expensive and may require waiting (blocking). In our case it even triggers network communication before the next thread can activate the Desktop again. This penalty is multiplied by the high update rate from the back-end: ~600 events / sec (20 Robots * 100 updates/second * 30%).
As network round trips tend to take longer than 1.6ms (1000ms/600) we need to reduce the update frequency by processing multiple updates within the same activation/deactivation window. Here the RX Buffer operator does the trick.
backend.trackRobots()
.buffer(100, TimeUnit.MILLISECONDS))
.compose(ZkObservable.activated()) //only activate once all events collected within 100ms
.subscribe(/*now we'd have a List<TrackEvent<Robot>> here*/)
Now things are already a lot better: Collecting events for 100ms and updating all affected robots at once will save a majority of desktop activations. 10/sec vs ~600/sec before.
Given the chance of an empty buffer, it makes sense to avoid the desktop activation sometimes.
Finally it would feel more natural to deal with single events (instead of lists) at the end of our stream which can be done with the flatMap/concatMap operator family.
So a better version would look like this ...
backend.trackRobots()
.buffer(100, TimeUnit.MILLISECONDS))
.filter(items -> !items.isEmpty()) //avoid activation when buffer is empty
.compose(ZkObservable.activated())
.concatMapIterable(items -> items); //concatMap... preserves the original emission order (flatMap... does not)
.subscribe(event -> trackRobot(event), this::handleError);
... which can again be composed into the activatedThrottle operator (combining the buffering, activation and buffer-separation) resulting in this ...
backend.trackRobots()
.compose(ZkObservable.activatedThrottle(100))
.subscribe(event -> trackRobot(event), this::handleError);
Now we start talking ... but wait ... there's still more eliminate.
Optimizing the Buffer Size
Batching an average of 60 updates for 20 Robots must inevitably contain duplicates. It would be a waste to update the same Robot several times within a batch causing redundant render instructions for the client engine increasing the network traffic and the client side render time. We are only interested in the latest updates, so we better eliminate duplicates in each batch before rendering. Ideally this should happen while buffering the events.
So far I haven't found a default operator that does what I needed (e.g. distinct does quite the opposite emitting only the first unique event - whereas I'd like to keep the last of a kind - which of course doesn't make sense in an infinite stream) so I came up with the following solution using another flavor of the buffer operator accepting a custom buffer supplier (better ideas are welcome !!).
The buffer supplier provides a KeyedSet that identifies events for the same Robot by keySelector and preserves the order based on last access. Adding this to a (let's call it) activatedThrottleUnique operator we get this:
backend.trackRobots()
//use the robot.id as unique key
.compose(ZkObservable.activatedThrottleUnique(100, event -> event.getCurrent().getId()))
.subscribe(event -> trackRobot(event), this::handleError);
Finally the maximum batch size is limited to 20 (1 update per Robot no duplicates). Considering where we came from this dramatically reduces the overall workload, network traffic and locking overhead. Even if the back-end would suddenly produce more events this number of 20 items per batch would not increase - keeping the UI update effort stable.
Now let's do something for the user.
The What - Filtering
Still there's only so much a connection and the human eye can handle, which makes it reasonable to think about filtering the event stream and only display what the supervisor is interested in: i.e. highlight only happy/angry robots or follow robots which cross the center region.
We decide to render the highlighted robots near real-time (updated every 100ms) and render the remaining robots slightly transparent, a bit smaller and update them only once per second. The highlighting is done by css classes which are added/removed dynamically.
The different update speeds can be achieved by subscribing to the stream twice with different throttle intervals and filters. A CompositeDisposable simplifies subscription cancellation.
Here the resulting code as in the example RobotFarmViewModel:
subscriptions = new CompositeDisposable();
Disposable realtimeSubscription = robotTracker.trackRobots(this.currentFilter)
.compose(ZkObservable.activatedThrottleUnique(100, event -> event.getCurrent().getId()))
.subscribe(this::trackRealtimeRobot, this::handleError);
Disposable delayedSubscription = robotTracker.trackRobots(negate(this.currentFilter))
.compose(ZkObservable.activatedThrottleUnique(1000, event -> event.getCurrent().getId()))
.subscribe(this::trackDelayedRobot, this::handleError);
subscriptions.addAll(realtimeSubscription, delayedSubscription);
Lines 3, 7: the RobotTracker class translates the TrackEvents based on a filter
Lines 5, 9: the logic to update the UI is slightly different between the real time and the delayed Robots
The filtering is handled by the RobotTracker class which translates the stream of ON_UPDATE events into ON_UPDATE, ON_ENTER and ON_LEAVE to indicate whether the filter condition (e.g. the center region) was entered or left or remained unchanged.
Now only the currently filtered robots are moving near real time (every 100ms) while the remaining robots only update once every second.
Summary
I hope the Robot Farm supervisors will have as much fun using the stream lined UI as I had implementing it. And of course I hope to contribute some useful ideas and techniques when using reactive streams glued together with a UI framework be it ZK or a different one.
As always some bugs/problems might have slipped into the code - so I am eager to hear your comments and suggestions to improve this experiment.
You might also ask: Why Robots'??'
The answer is simple: Most examples on RX I've seen focus on stock price tickers which are quite 1-Dimensional. So I thought it might not hurt add multiple criteria which both change and can be used to filter on. And I just love the colorful result compared to plain numbers updating.
Example Sources
The code examples are available on github in the zk-rxdemo repository
Running the Example
Clone the repo
git clone git@github.com:zkoss-demo/zk-rxdemo.git
The example war file can be built using the gradle-wrapper (on windows simply omit the prefix './'):
./gradlew war
Execute using jetty-runner (fastest):
./gradlew startJettyRunner
Execute using gretty:
./gradlew appRun
Then access the example http://localhost:8080/zk-rxdemo CHECK LINK
Comments
Copyright © Potix Corporation. This article is licensed under GNU Free Documentation License. |