Reactive Programming in Java: Usage Example
What is reactive programming in Java and how to use it in real life
While reactive Java is attracting developers’ attention, most programmers still choose the multi-threading paradigm. Why? The principle of threads is relatively easy to grasp. Reactive programming, on the other hand, requires rethinking many of the programming principles we are accustomed to. Trying to explain why asynchronous I/O is the preferred alternative is like trying to explain the sphericity of the Earth to someone who has always believed in its flat nature.
I prefer to learn through gamification and experimentation, creating “toy” systems that can then be used as the basis for larger systems if needed. Here I will present one such base system that will demonstrate the basics of reactive Java using Project Reactor. And since it is small (less than a thousand lines in nine files), there should not be difficulties with understanding it.
It is a system of pseudo-microservices. Although everything is enclosed in one executable file, each “service” is in a separate class that does not contain a state. The classes themselves can only interact with each other using a message queue and a database. One main class starts all other classes, and they are programmed to terminate after a set period.
In my system, a purchase order is generated randomly and queued up just like a used vehicle acceptance scheme. From this queue, another service receives the purchase order, adds it to the database, and sends information about the vehicle type to one of three message queues. It depends on whether it is a car, truck, or motorcycle. Finally, there are three services that read these three queues, verify the order’s presence in the database, and determine parking spots for the vehicle.
Since microservices are separated by a framework like Spring, the main() function acts more like a testing function. In any case, we just need to ensure that everything runs consistently on one JVM. I will assume that this system will work and require minimal adjustments anyway, but the real test is if the individual services work. Here is a gist of the main() function:
We start all consumers, and then we start the generation of purchase orders, which creates orders and queues them one by one for 10 seconds. All methods in the classes can be static since none of them have a state.
Since the whole action starts in the PurchaseOrderGenerator, we’ll take a look there first. Here is the main part of the code:
We are using the RabbitMQ message queue, which is a reactive Java library. It contains Sender.sendWithPublishConfirms which receives a Flux and returns a Flux. Flux is a sequence of elements that can be accessed one at a time. Its length can be infinite, and only one element is involved in the process at a time. It is possible, however, to involve more, depending on several factors.
Flux is a key component of the Project Reactor asynchronous I/O library. It allows methods to be chained together to manage elements as they flow through the system. As well it manages the flow itself. Methods that manipulate elements are usually taken as parameters to lambda functions, allowing them to be called on each element as it moves through the system. The two most common methods are map and filtering. The map function converts elements in a stream from one type to another. A filter is used to discard elements that do not meet certain criteria.
We need to create a contiguous list of purchase orders and submit it to the Sender.sendWithPublishConfirms method as a Flux. In this process, we start with the line Flux.generate((sink) -> sink.next(createRandomPurchaseOrder())) which creates a potentially infinite list of random purchase orders. Because Flux.generate returns a Flux <Object> , we need a casting method to let the system know we’re working on PurchaseOrder objects. The doOnNext Flux method provides a convenient way to introduce side effects. Here I use it mainly for logging.
Next, the delayElements method adds a delay between each element. I do this to simulate the appearance of random orders at a reasonable rate. You can delete it without any problems, but instead of running the system for ten seconds and finishing, it will fill up with queues of purchase order messages that will take more than two minutes to process. The take method will allow Flux to process for ten seconds and then stop. I do this because this generator is for testing purposes only. In a real system, purchase orders will most likely come from the API, which will add them to the message queue.
For RabbitMQ to process the items, they first need to be converted to an OutBoundMessage as a byte array representing the JSON serialization of the purchase order. This is done using the map method and a lambda expression that creates the desired OutBoundMessage. If an error occurs in the serialization of the purchase order, we send an empty array, which is filtered on the opposite side. You can filter it here too to save the queues from having to send invalid messages. But we still need to check on the opposite side, and since this is just a test circuit, we can send them too.
The doFinally method calls its lambda expression only once, and in this case we use it to log the end of the stream and close the sender. At the end of this chain, we get a Flux that is suitable for passing to the Sender.sendWithPublishConfirms method. What we get back is a Flux that we can check to make sure the message has been placed on the appropriate queue. In this simple case, we do not check.
The subscribe method, which is called on Flux, starts the chain. It is important to note that before calling the subscribe method, we are just building a chain of actions. Once the subscribe method is called, we don’t do anything else to the stream, and it only returns a Disposable object that can be used to stop the subscription. We don’t even store a Disposable object, because our thread will stop after ten seconds, as written in the event chain. The initial Flux we created with the generate method does not end with a call to subscribe, but waits for the Sender we pass it to subscribe to it to start this chain of events.
Before moving on to the actual services in the system, let’s take a look behind the scenes and see what drives it all. There is one magical element that goes unnoticed in most cases, namely the Scheduler. Take a look at Flux generated by generate. The first two methods in the chain, cast, and doOnNex, contain lambda functions that do not block and return a result after a very short time. However, the next method in the chain, delayElements, must somehow block to delay the flow. How do we lock a function in a world without locks? The answer is Scheduler. The scheduler can look at the chain, as well as the elements passing through it, deciding which of them and in which part of the chain should be processed.
You can look at it this way. Inside a multitasking operating system, something watches the waiting processes and decides which one gets the attention of the CPU next. The scheduler works similarly, only it doesn’t have to worry about interrupts. All the tasks that the scheduler works with are very small pieces of code. “Blocking” tasks like delayElements are split into two parts. The first part gets the item from the thread, sets the timer, and returns control to the scheduler. Then the scheduler can perform some other task. As soon as the delayElements method timer expires, the second part is called and the saved element gets the opportunity to move to the next method in the chain.
I view this process as two continuous conveyor belts. The first moves elements towards the second, but the second move much more slowly. As a result, the elements linger on the first tape, while the second one gradually removes them from it. As the lingering elements accumulate, the so-called “back pressure” arises, and the system understands that you just need to stop the first tape until the second catches up with it a little.
Next, consider the second system service, PurchaseOrderConsumer. Its task is to extract items from the queue of purchase orders, log them in the database data, processing and subsequent selection for another queue. Here is the main code:
Very similar to the first service. There is an outer wrapper here that sends the selected objects to different message queues and calls subscribe. The difference is that instead of getting the Flux to send with the generate method, we get the Flux from the Receiver.consumeAutoAck method. This creates a stream of items coming from the purchase order message queue, after which we can build a processing chain for them. This chain consists mainly of the usual methods, map to convert a JSON string into an element, filter to remove invalid messages, doOnNext and doOnDiscard to log purchase orders that are being processed or that have been filtered out.
We then move on to a new method, flatMap. This is a special method that deserves attention. Those familiar with flatMap from Java Streams know that if a transform returns a Stream (or a list that can be easily converted to a Stream), then flatMap will change the outer Stream to include all the elements of the inner Stream. Shortly, it converts a Stream (stream) of streams into a single stream of all elements. In the world of Flux, the same thing happens, the transformation of Flux (sequences) of sequences into a single sequence with all elements.
In our case, the transformation concerns the ReactiveCollection.upsert method, which inserts a purchase order into CouchBase. The return type is obtained by Mono. We still need to talk about Mono, which is a special type of Flux. Mono works much like Flux, but can either contain a single element or be empty. Also similar to Flux, the element it contains is not materialized by Mono, but requires it to wait. Here we again need to delay the processing of the nested element, but without blocking.
This is where flatMap comes to the rescue. In this case, since the inner structure is Mono, the outer Flux still has a one-to-one correspondence. We discard the result and simply return the original purchase order for further processing (as was done by mapping map(result -> po)). So flatMap works similarly to the delayElements method discussed earlier, where it splits into two parts with all database I/O squeezed in between. But the planner knows to pass an element to flatMap and move on to another element. It will also remember to receive the returned database item when it is ready for further processing. Once again, flatMap splits our imaginary conveyor belt in two.
It remains to say the last method of the chain, the rest are similar to the previous ones. If we were talking about a real service, then the timeout method would not even have to be on the thread. We only have it to cancel the flow when an element is not received within a given time. This allows our little toy system to close everything and eventually terminate.
In a real service, you probably would not have this. The only exception is if the service is waiting for a certain number of items per second, and for several minutes does not receive any. Then you might suspect that the connection to the queue has been lost and want an orchestration service like Kubernetes to restart your service.
Another use case for timeout is code testing. If your test is waiting for an element from the message queue, then you can set a timeout so that the test does not wait indefinitely.
The system is left with a final set of services that read the message queues into which PurchaseOrderConsumer infers the Vehicle type. The only processing it does is to read the purchase order from the Vehicle type, confirm that the order exists in the database, determine a parking space for it according to the Vehicle type, and log of course object. In a real system, you would probably want to add it to the vehicle registration database or use some other tracking method.
I have created three service classes, one for each type; Car (passenger car), Truck (truck) or Motorcycle (motorcycle). In the current code example, I could make one class with many parameters and run the getter three times, one for each type. But I left them separate, because I foresaw that each type would require its logic, the cardinality of types would be very low, and the base code would be less than a hundred lines. If the services had a lot of common logic, I would not regret the extra time to combine them. But my pragmatism sometimes trumps perfectionism, and I love that.
To check everything, I had to run CoucheBase and RabbitMQ. In both cases, I simply created a Docker instance:
In the case of CouchBase, I had to open the admin console at http://localhost:8091, click on the user agreement, and set up the po cart. After that I was able to start the services and observe the log:
Assembly was successful! If you build and run the system, I advise you to carefully follow the logs, all of which output the current thread. You will soon notice the pattern of small conveyor belts (some will run in parallel) that I was talking about.
I hope this helped you understand Project Reactor’s asynchronous I/O solution. Using method chaining can be tricky to understand, but I believe that once you get the hang of it, things will be a lot easier to read. This technique takes the normal declarative programming style and makes it more like an imperative one.
Subscribe to our blog
We are glad you’re interested in QIT! We'll let you know when appearing something