Desktop version

Home arrow Computer Science arrow Building Applications with Scala

Source

Developing Reactive Backing Services

In the previous chapter, you learned how to Bootstrap your application using Activator, and we developed our web application using Scala and the Play framework. Now we will enter into the reactive world of RxJava and RxScala.

In this chapter, we will cover the following topics:

  • • Reactive programming principles and the Reactive Manifesto
  • • Understanding the importance of non-blocking IO
  • • Observables, functions, and error handling with Rx
  • • Refactoring our controllers and models to call our services
  • • Adding RxScala to our services
  • • Adding logging

Getting started with reactive programming

Building applications today is harder than it was before. Everything now is more complex: we have to use more cores in processors, and we have cloud-native applications with hundreds of machines for a single service. Concurrent programming has always been hard, and it will always be so, because it is difficult to model time. In order to address this, we need to have a reactive style of architecture. In order to be able to handle more users and scale our applications, we need to leverage Async and non-blocking IO. To help us with this task, we can rely on RxJava and RxScala. Being reactive is not only about code but also about architectural principles.

The Reactive Manifesto captures these principles very well, and there are a couple of technologies that follow these principles in order to be fully reactive.

The Reactive Manifesto can be shown as in the following diagram:

For more information, you can visit http ://www . reactivemanifesto . org/.

The Reactive Manifesto describes what this reactive architecture/system looks like.

Basically, there are the following four core principles underlining the reactive idea:

  • Responsive: The system should respond in a timely manner. In other words, the system should detect problems quickly, and deal with them effectively, apart from providing rapid and consistent response time.
  • Resilient: The system should stay responsive even after failure. This is done via replication containment, isolation, and delegation (https ://en .wikipedia. org /wiki/Deiegation_pattern). Containment and isolation are ideas that come from the naval industry, and are defined by the bulkhead pattern (https ://en. w ikipedia. org/wiki/Bulkhead_ (partition)). Failures are contained at each component. Doing so makes sure that one system's failure does not affect other systems. Recovery is delegated to another system, and not to the client.
  • Elastic: The ability to increase and decrease resources for the system. This requires you design your system without Single Point Of Failure (SPOF), and design using shards and replication. Reactive systems are predictive and cost- effective.

• Message-driven: Reactive systems rely on asynchronous message passing to ensure loose coupling, isolation, and location transparency. By doing so, we can delegate failures as messages. This gives us elasticity, load management, and flow control. It's even possible to apply back-pressure (also known as throttling) when needed. All this should be done with non-blocking communication for better resource utilization.

Alright, let's use these principles practically in our application with RxScala. RxScala is just a Scala wrapper for RxJava, but it is better to use because it makes the code more functional, and you don't need to create objects such as Actionl.

In our application, we have three major resources: products, reviews, and images. All products must have a price, so we will built a fully reactive price generator with the Play framework, RxScala, and Scala right now.

So first of all, we will play with RxScala in our Play application, then we will create a separate microservice, make reactive calls to that microservice, and retrieve our price suggestion for that service. All data flow transformations are using observables.

Let's create the routes for this controller at ReactiveWebStore/conf/routes , as follows:

#

# Services

#

GET /rx/prices controllers.RxController.prices

GET /rx/aprices controllers.RxController.pricesAsync

We have two routes here: one for a regular action, and another for an Async action that will return a Scala Future. Let's create a new controller called Rx Controller.scala. This controller needs to be located at ReactiveWebStore/app/controller.

Let's have a look at RxController, which is our reactive RxScala simple controller:

@Singleton

class RxController @Inject()(priceService:IPriceSerice) extends Controller {

def prices = Action { implicit request =>

Logger.info("RX called. ")

import ExecutionContext.Implicits.global

val sourceObservable = priceService.generatePrices

val rxResult = Observable.create { sourceObservable.subscribe

}

.subscribeOn(IOScheduler())

.take(1)

.flatMap { x => println(x) ; Observable.just(x) }

.toBlocking

.first

Ok("RxScala Price suggested is = " + rxResult)

}

def pricesAsync = Action.async { implicit request =>

Logger.info("RX Async called. ")

import play.api.libs.concurrent.Execution.Implicits. defaultContext

val sourceObservable = priceService.generatePrices val rxResult = Observable.create { sourceObservable.subscribe }

.subscribeOn(IOScheduler())

.take(1)

.flatMap { x => println(x) ; Observable.just(x) }

.toBlocking

.first

Future { Ok("RxScala Price sugested is = " + rxResult) }

}

}

So, in the very first method called prices, we return a regular Play framework Action. We receive IPriceService via dependency injection. This IPriceService is a reactive service, because it uses observables. So we call a method, generatePrices, which will return Observable[Double]. This will be our source observable, that is, the data source of our computation. Moving forward, we create a new observable subscribing into the source observable, and then we apply some transformation. For instance, we take just one element, and then we can perform transformation using flatMap. For this case, we do not really apply transformations. We use flatMap to simply print what we got, and then continue the chain. The next step is to call toBlocking, which will block the thread until the data is back. Once the data is back, we get the first element, which will be a double, and we return Ok.

Blocking sounds bad and we don't want that. Alternatively, we can use the async controller in the Play framework, which won't block the thread and return a Future. So that's the second method, called pricesAsync. Here we have similar observable code. However, in the end, we return a Future which is not blocking. However, we call toBlocking from the observable that will block the call, thus making it the same as the previous method. To be clear, Action is not bad. By default, everything is Async in Play, because even if you don't return an explicit Future, the Play framework creates a promise for y and makes you code Async. Using HTTP, you will block the thread at some point. If you want to be 100% nonblocking from end to end, you need to consider a different protocol such as web sockets.

Let's take a look at the service now. This service, and other services, need to be located at

ReactiveWebStore/apps/services. First we will create trait to define the service behavior.

 
Source
Found a mistake? Please highlight the word and press Shift + Enter  
< Prev   CONTENTS   Next >

Related topics