Desktop version

Home arrow Computer Science arrow Building Applications with Scala

Source

IPriceService - Scala trait

As you can see in the following code, we defined IPriceService with just one operation, that is, generatePrices, which returns Observable[Double]. The next step now is to define the service implementation. This code needs to be located in the same services folder as the previous trait:

trait IPriceSerice{

def generatePrices:Observable[Double]

}

PriceService - RxScala PriceService implementation

First we create PublishSubject, which is a way to generate data into observables. Scala has a nice way of generating infinite sequences using Stream.continually. So we pass a function which generates double random numbers from 0 to 1,000. This will happen forever and, because this is an expensive computation, we run it into a Future. The right thing to do will be to use a method after Stream, because this will finish the computation. For the sake of the exercise, we will keep it this way for now.

Each double random number is published into PublishSubject by the onNext method. Now let's move to the generatePrices method, which uses three observables to generate the number for us. To be clear, of course we could do a simpler solution here. However, we are doing it this way to illustrate the power of observables, and how you can use them in practice.

We have Even and Odd observables, both subscribing to PublishSubject, so they will receive infinite double numbers. There is a flatMap operation to add 10 to the number. Keep in mind that everything you do needs to be in an observable. So when you do transformations with flatMap, you always need to return an observable.

Finally, we apply the filter function to get only even numbers on the Even observable and only odd numbers on the Odd observable. All this happens in parallel. Even and Odd observables do not wait for each other.

The next step is to merge both observables. We create a third observable that starts empty and then merges the infinite doubles from the Even observable with the infinite doubles from the Odd observable. Now is the time to limit the computation to only 10 numbers. We don't know how many odd or how many even numbers will be there because of Async. If you wish to control the number of odds and evens, you need to apply the take function on each observable.

Finally, we apply foldLeft to sum all the numbers and get a total. However, when we do that, we get only 90% of the numbers. This last observable is what is returned to the controller. Nothing is blocked here, and it's all Async and reactive.

You maybe wondering why Stream.Continuously generates different values all the time. That happens because we use a Call-by-Name function in Scala. We import the nextDouble function, and pass a function instead of the value of the function:

@Singleton

class PriceService extends IPriceSerice{

var doubleInfiniteStreamSubject = PublishSubject.apply[Double]()

Future {

Stream.continually(nextDouble * 1000.0 ).foreach { x => Thread.sleep(1000); doubleInfiniteStreamSubject.onNext(x)

}

}

override def generatePrices:Observable[Double] = { var observableEven = Observable.create { doubleInfiniteStreamSubject.subscribe }

.subscribeOn(IOScheduler())

.flatMap { x => Observable.from( Iterable.fill(1)(x + 10) )

}

.filter { x => x.toInt % 2 == 0 }

var observableOdd = Observable.create {

doubleInfiniteStreamSubject.subscribe }

.subscribeOn(IOScheduler())

.flatMap { x => Observable.from( Iterable.fill(1)(x + 10) )

}

.filter { x => x.toInt % 2 != 0 } var mergeObservable = Observable .empty

.subscribeOn(IOScheduler())

.merge(observableEven)

.merge(observableOdd)

.take(10)

.foldLeft(0.0)(_+_)

.flatMap { x => Observable.just( x - (x * 0.9) ) }

return mergeObservable

}

}

We need to register this service in Guice in Module.scala located in the default package at

ReactiveWebStore/app.

 
Source
Found a mistake? Please highlight the word and press Shift + Enter  
< Prev   CONTENTS   Next >

Related topics