Desktop version

Home arrow Computer Science arrow Building Applications with Scala

Source

IRndService.scala - Scala trait

Let's look at the service definition. First of all, we need to define a trait for the service that will define the operations we need:

trait IRndService { def next():Double def call():Future[String] def rxScalaCall():Observable[Double] def rxScalaCallBatch():Observable[Double]

}

RndService.scala - RndService implementation

Now we can move to the real service implementation. This needs to be located at

ReactiveWebStore/app/services:

@Singleton

class RndService @Inject() (ws: WSClient) extends IRndService { import play.api.libs.concurrent.Execution.Implicits. defaultContext

override def next():Double = {

val future = ws.url("http://localhost:9090/double").get().map { res => res.body.toDouble }

Await.result(future, 5.seconds)

}

override def call():Future[String] = {

ws.url("http://localhost:9090/double").get().map { res => res.body }

}

override def rxScalaCall():Observable[Double] = { val doubleFuture:Future[Double] =

ws.url("http://localhost:9090/double").get().map { x => x.body.toDouble }

Observable.from(doubleFuture)

}

// Continue ...

In order to call our microservice (ng-microservice), we need to inject a special Play framework library called ws, a utility library to call web services. We inject it by adding the code (ws:WSClient) into the class definition.

When you call something with ws, it returns a Future. We need to have the Future executors in place. That's why the import defaultContext is very important, and you cannot skip it.

For the method, as you can see, we next call our microservice at

http://localhost:90 90/double to get a single double. We map this result, and get the body of the result, which will be the double itself.

For this method, we use Await.result, which will block and wait for the result. If the result is not back in five seconds, this code will fail.

The second method called call does the same, but the main difference is that we are not blocking the service; instead, we are returning a Future to the controller. Finally, the last method called rxScalaCall does the same: it calls our microservice using the ws library. However, we return an observable. Observables are great because they can used as a Future. Now it is time to go check out the final operation and the most interesting one. For this same class, we need to add another method such as this one:

The method rxScalaCallBatch in RndService.scala is as follows:

override def rxScalaCallBatch():Observable[Double] = {

val doubleInfiniteStreamSubject = PublishSubject.apply[Double]() val future = ws.url("http://localhost:9090/doubles/10")

.get()

.map { x => Json.parse(x.body).as[List[Double]] } future.onComplete {

case Success(l:List[Double]) => l.foreach { e => doubleInfiniteStreamSubject.onNext(e) } case Failure(e:Exception) => doubleInfiniteStreamSubject.onError(e)

}

var observableEven = Observable.create { doubleInfiniteStreamSubject.subscribe }

.onErrorReturn { x => 2.0 }

.flatMap { x => Observable.from( Iterable.fill(1)(x + 10) ) }

.filter { x => x.toInt % 2 == 0 }

.flatMap { x => println("ODD: " + x) ; Observable.just(x) } var observableOdd = Observable.create { doubleInfiniteStreamSubject.subscribe }

.onErrorReturn { x => 1.0 }

.flatMap { x => Observable.from( Iterable.fill(1)(x + 10) ) }

.filter { x => x.toInt % 2 != 0 }

.flatMap { x => println("EVEN: " + x) ; Observable.just(x) }

var mergeObservable = Observable

.empty

.merge(observableEven)

.merge(observableOdd)

.take(10)

.foldLeft(0.0)(_+_)

.flatMap { x => Observable.just( x - (x * 0.9) ) }

mergeObservable

}

So, first we create PublishSubject in order to be able to produce data for the observables. Then we make the ws call to our microservice. The main difference now is that we call the batches operation and order 10 doubles. This code happens in a future, so it is non-blocking.

We then use the Map function to transform the result. The ng-microservice function will return JSON, so we need to deserialize this JSON into Scala objects. Finally, we run a pattern matcher in the Future result. If the result is a success, it means everything is good. So, for each double, we publish into the observables using PublishSubject. If the service is down or we have a problem, we publish an error to the observables downstream.

Next we create three observables: one for odd numbers, one for even numbers, and a third one which will merge the other two and do extra computation. The way we did the conversion between Future and Observable is ideal, because it is non-blocking.

Here we have code very similar to what we had before for the Rx controller. The main difference is that we have error handling, because ng-microservice might never return, as it may be down or just not working. So we need to start working with fallbacks. Good fallbacks are key to error handling for Reactive applications. Fallbacks should be sort of static; in other words, they should not fail at all.

We provided two fallback methods: one for the Odd Observable and the other for the Even Observable. These fallbacks are done by setting the method OnErrorReturn. So for the even one, the fallback is static and the value is 2, and for the odd one the value is 1. This is great, because even with failure our application continues to work.

You might realize we are not using the take function this time. So will this code run forever? No, because ng-microservice just returns 10 doubles. Finally, we merge the observables into a single observable, add all the numbers, get 90% of the value, and return an observable.

 
Source
Found a mistake? Please highlight the word and press Shift + Enter  
< Prev   CONTENTS   Next >

Related topics