zk-and-rx"
Robertwenzel (talk | contribs) |
Robertwenzel (talk | contribs) |
||
Line 175: | Line 175: | ||
.doAfterNext(event -> Executions.deactivate(desktop)) | .doAfterNext(event -> Executions.deactivate(desktop)) | ||
.doOnTerminate(() -> Executions.deactivate(desktop)); | .doOnTerminate(() -> Executions.deactivate(desktop)); | ||
− | .subscribe(event -> | + | .subscribe(event -> trackRobot(event), this::handleError); |
</source> | </source> | ||
Line 183: | Line 183: | ||
backend.trackRobots() | backend.trackRobots() | ||
.compose(ZkObservable.activated()) //means perform the downstream with an activated desktop (and clean up after it) | .compose(ZkObservable.activated()) //means perform the downstream with an activated desktop (and clean up after it) | ||
− | .subscribe(event -> | + | .subscribe(event -> trackRobot(event), this::handleError); |
</source> | </source> | ||
Looks much better ... and as a bonus, the same technique can be used for other side effects such as logging, setting thread locals, open close other resources, transaction bracketing etc. - basically anything that doesn't affect the data in the stream. | Looks much better ... and as a bonus, the same technique can be used for other side effects such as logging, setting thread locals, open close other resources, transaction bracketing etc. - basically anything that doesn't affect the data in the stream. | ||
− | == | + | == Throttling/Batching == |
+ | |||
+ | Being ''just'' a side effect doesn't mean it's a ''cheap'' side effect. Opening closing a transaction or acquiring a lock is often expensive and requires waiting. In our case it even triggers network communication before the next thread can activate the Desktop again. | ||
+ | This penalty is multiplied by the high update rate from the backend: ~600 events / sec (20 Robots * 30% * 100 updates/second). | ||
+ | |||
+ | As network round trips tend to take longer than 1.6ms (1000ms/600) we should reduce this by processing multiple updates within the same activation/deactivation window. Here the RX buffer operator '''LINK ME''' does the trick. | ||
+ | |||
+ | <source lang="java"> | ||
+ | backend.trackRobots() | ||
+ | .buffer(100, TimeUnit.MILLISECONDS)) | ||
+ | .compose(ZkObservable.activated()) //only activate once all events collected within 100ms | ||
+ | .subscribe(/*now we'd have a collection of events here*/) | ||
+ | </source> | ||
+ | |||
+ | Now things are already a lot better: collecting events for 100ms and updating all affected robots at once will save a majority of desktop activations. 10 vs ~600 before while performing an average of 60 updates in each batch. | ||
+ | |||
+ | There is a chance for the buffer to be empty. I such cases the desktop activation can be avoided at all. | ||
+ | |||
+ | Finally it would feel more natural to deal with single events (instead of lists) at the end of our stream which can be done with the concatMap... '''LINK ME''' operator family | ||
+ | |||
+ | So a better version would look like this: | ||
− | + | <source lang="java"> | |
+ | backend.trackRobots() | ||
+ | .buffer(100, TimeUnit.MILLISECONDS)) | ||
+ | .filter(items -> !items.isEmpty()) //avoid activation when buffer is empty | ||
+ | .compose(ZkObservable.activated()) | ||
+ | .concatMapIterable(items -> items); //concat... to preserve the original emission order | ||
+ | .subscribe(event -> trackRobot(event), this::handleError); | ||
+ | </source> | ||
− | |||
=== Optimizing the Buffer === | === Optimizing the Buffer === | ||
Line 201: | Line 227: | ||
** avoid redundant updates | ** avoid redundant updates | ||
** batch update (in a single ZK execution) | ** batch update (in a single ZK execution) | ||
+ | |||
+ | == Filtering == | ||
+ | |||
+ | Still there's only so much a connection and the human eye can handle, which makes it reasonable to think about filtering / throttling / buffering / batching the event stream before updating the UI. | ||
= Summary = | = Summary = |
Revision as of 06:37, 22 September 2017
Robert Wenzel, Engineer, Potix Corporation
September, 2017
ZK 8.5
Introduction
Assume you are working for a project on a Robot Farm (I think there could be worse projects). Unfortunately those Robots are a bit moody and unreliable sometimes - so of course there needs to be a supervisor sitting in his/her office chair and watching a screen to follow all the Robots movements.
Since human supervisors are prone to errors too there should be multiple employees observing the same Robots simultaneously.
Based on their current assignment and in order to preserve bandwidths it must be possible to track certain Robots near real-time (filtered by mood or position) without completely losing track of the overall situation, i.e. highlighted Robots have a faster update rate than others (100 ms vs. 1 sec).
Employees may come and go as they like and connect/disconnect to the live data feed on demand.
However the Robots are constantly sending data at high frequency to your back-end process. Your challenge is to connect the UI to the stream of information reducing it based on the filter criteria and throttle the sheer amount of data to something the human eye and your network connection can handle.
The Outset
For the Robot event stream you choose rxjava LINK ME and ZK for the frontend (disclaimer: I am currently working for ZK) RxJava's reactive streams fit well into ZK's MVVM design pattern, making an interesting combination worth talking about.
Here the technologies used:
The Backend (RX Observable)
The stream of TrackEvents is produced by a single hot Observable LINK ME initialized at first subscription.
- constantly streams TrackEvent at 10ms intervals
- unreliability simulated by randomly updating~30% of the robots with an additional 2% chance to change the mood
- allows multiple consumers (using observable.publish() / observable.connect() LINK ME)
Of course this Observable is unaware of the front end and will just stream the events once started - no matter what. For each new subscriber it will initially send TrackEvents for all Robots followed by the random stream of events every subscriber shares.
LINK FULL SOURCE
public class RobotBackend {
private static int NUM_ROBOTS = 20;
...
/**
* called once to start the stream of events
*/
public void start() {
allRobots = LongStream.range(0L, NUM_ROBOTS)
.mapToObj(index -> new Robot(100 + index, new Position(0, 0), Robot.Mood.NEUTRAL))
.collect(Collectors.toConcurrentMap(Robot::getId, robot -> robot));
Observable<TrackEvent<Robot>> obs = Observable.create(this::backGroundThread);
hotRobotObservable = obs.publish();
disposable = hotRobotObservable.connect();
}
/**
* called by each subscriber to connect to the same event stream
* @return Observable of {@link TrackEvent}
*/
public Observable<TrackEvent<Robot>> trackRobots() {
Stream<TrackEvent<Robot>> currentRobots = allRobots.values().stream()
.map(robot -> new TrackEvent<>(TrackEvent.Name.ON_ENTER, robot, robot));
//prepend initial state for all robots to the hot stream of updates
return hotRobotObservable
.startWith(currentRobots::iterator);
}
... some logic to create the thread and random positions below ...
The UI (ZK MVVM application)
A simple UI is implemented in ZK using a zul template and a java ViewModel class. UI specific calculated properties such as styleClasses (derived from mood and realTime status) are added by wrapping the domain class Robot into a UiRobot.
LINK FULL SOURCE
<?style src="style.css"?>
<zk xmlns:w="client">
<div id="robotFarm" viewModel="@id('vm') @init('zk.rx.demo.vm.RobotFarmViewModel')">
...
<div sclass="trackingArea">
<if test="@load(vm.centerRegionTracking)">
<div sclass="centerRegionArea"/>
</if>
<forEach items="@init(vm.trackedRobots)" var="mapEntry">
<apply uiRobot="@init(mapEntry.value)">
<div sclass="@load(uiRobot.styleClasses)"
left="@load((uiRobot.robot.position.x += '%'))"
top="@load((uiRobot.robot.position.y += '%'))">
</div>
</apply>
</forEach>
</div>
<div sclass="controlArea" align="center">
Real-time:
<combobox readonly="true" model="@init(vm.filterNamesModel)" onSelect="@command('selectFilter')" width="120px"/>
<button iconSclass="@load(vm.running ? 'z-icon-stop' : 'z-icon-play')" label="@load(vm.running ? 'Stop' : 'Start')"
onClick="@command('toggleRunning')"/>
<button iconSclass="z-icon-retweet" label="Ping Server" onClick="@command('testServerResponse')"/>
</div>
</div>
</zk>
Line 10-15: render Robots as divs with dynamic styles and position reacting to model changes
I'll not go too deep into ZK specifics now. Updating the UI (i.e. responding to data changes in the View Model) can be triggered several ways: A simple one is annotating a command handler method with @NotifyChange ...
@Command
@NotifyChange("centerRegionTracking")
public void selectFilter() {
currentFilter = availableFilters.get(filterNamesModel.getSelection().iterator().next());
if(isRunning()) {
start();
}
}
... which will then update the corresponding data binding (@load(vm.centerRegionTracking)
) in the zul file:
<if test="@load(vm.centerRegionTracking)">
<div sclass="centerRegionArea"/>
</if>
An imperative alternative is to call BindUtils.postNotifyChange(...)LINK JAVADOCS in order to trigger a @load
binding.
For the interested here the complete MVVM documentation LINK ME.
The Challenge ("Magic in the Middle")
Knowing how to update the UI is mostly a technicality. The trickier decisions are: when and how often to notify the UI in order to re-render parts, because those will have direct impact on the performance and responsiveness of your application.
Also since the server and client side are connected via network there's a latency which needs to be dealt with. In ZK 8.5 this will be improved by introducing web sockets LINK ME for client-server communication.
The How - Updating the UI
As the observable emits TrackEvent<Robot>
objects the basic way to process those might look like this:
backend.trackRobots()
.subscribe(event -> updateUi(event), this::handleError);
However this would not work just that: A technical requirement is to obtain a "lock" before UI elements in page (called "Desktop" in ZK) can be updated e.g. via change notification - mentioned above. For user triggered events such as mouse or keyboard events this happens automatically - background threads have to obtain a lock on demand. Especially if only parts of the background thread need to update the UI, the remaining code can run in parallel without blocking user interactions. Getting the "lock" (activating the desktop) looks as simple as that (almost like a DB transaction):
try {
Executions.activate(desktop); //obtains the lock
//do some any updates here
} finally {
Executions.deactivate(desktop); //will release the lock and flush the changes to the UI and out to the browser
}
However this looks like tedious boilerplate code and forgetting to "deactivate" may lead to infinite dead locks for that particular "desktop". Better we wrap that in some way so it can be reused and integrated into the observable chain.
Obviously activate/deactivate don't affect the data of the stream so that the RX side effect operators (doOn... ) LINK ME sound like a good match:
backend.trackRobots()
.observeOn(Schedulers.io())
.doOnNext(event -> Executions.activate(desktop)) //potentially blocking that's why Schedulers.io()
.doAfterNext(event -> Executions.deactivate(desktop))
.doOnTerminate(() -> Executions.deactivate(desktop));
.subscribe(event -> trackRobot(event), this::handleError);
Again adding those 4 lines before the update might be tedious and still error prone (e.g. using the wrong Scheduler might lead to dead lock as well as forgetting any of the lines). To make this more manageable we can wrap it into a composite operator extending from ObservableTransformer<T, T> LINK ME collapsing those 4 lines into a single readable line.
backend.trackRobots()
.compose(ZkObservable.activated()) //means perform the downstream with an activated desktop (and clean up after it)
.subscribe(event -> trackRobot(event), this::handleError);
Looks much better ... and as a bonus, the same technique can be used for other side effects such as logging, setting thread locals, open close other resources, transaction bracketing etc. - basically anything that doesn't affect the data in the stream.
Throttling/Batching
Being just a side effect doesn't mean it's a cheap side effect. Opening closing a transaction or acquiring a lock is often expensive and requires waiting. In our case it even triggers network communication before the next thread can activate the Desktop again. This penalty is multiplied by the high update rate from the backend: ~600 events / sec (20 Robots * 30% * 100 updates/second).
As network round trips tend to take longer than 1.6ms (1000ms/600) we should reduce this by processing multiple updates within the same activation/deactivation window. Here the RX buffer operator LINK ME does the trick.
backend.trackRobots()
.buffer(100, TimeUnit.MILLISECONDS))
.compose(ZkObservable.activated()) //only activate once all events collected within 100ms
.subscribe(/*now we'd have a collection of events here*/)
Now things are already a lot better: collecting events for 100ms and updating all affected robots at once will save a majority of desktop activations. 10 vs ~600 before while performing an average of 60 updates in each batch.
There is a chance for the buffer to be empty. I such cases the desktop activation can be avoided at all.
Finally it would feel more natural to deal with single events (instead of lists) at the end of our stream which can be done with the concatMap... LINK ME operator family
So a better version would look like this:
backend.trackRobots()
.buffer(100, TimeUnit.MILLISECONDS))
.filter(items -> !items.isEmpty()) //avoid activation when buffer is empty
.compose(ZkObservable.activated())
.concatMapIterable(items -> items); //concat... to preserve the original emission order
.subscribe(event -> trackRobot(event), this::handleError);
Optimizing the Buffer
- filter the events (by selectable criteria)
- throttle UI updates (reducing the network load)
- buffer updates (100ms / 1000ms)
- avoid redundant updates
- batch update (in a single ZK execution)
Filtering
Still there's only so much a connection and the human eye can handle, which makes it reasonable to think about filtering / throttling / buffering / batching the event stream before updating the UI.
Summary
Example Sources
The code examples are available on github in the zk-rxdemo repository
Running the Example
Clone the repo
git clone git@github.com:zkoss-demo/zk-rxdemo.git
The example war file can be built using the gradle-wrapper (on windows simply omit the prefix './'):
./gradlew war
Execute using jetty-runner (fastest):
./gradlew startJettyRunner
Execute using gretty:
./gradlew appRun
Then access the example http://localhost:8080/zk-rxdemo CHECK LINK
Comments
Copyright © Potix Corporation. This article is licensed under GNU Free Documentation License. |