Building Apps around the Event Mesh
See the Solution in Action
1. Initial application
|
The following example should be considered as suboptimal, most likely a counterexample! |
Here’s a method from the Cabs application that handles the completion of a ride.
@Transactional (1)
public void completeTransit(Long driverId, UUID requestUUID, Address destinationAddress) {
destinationAddress = addressRepository.save(destinationAddress); (2)
TransitDetailsDTO transitDetails = transitDetailsFacade.find(requestUUID);
if (!driverService.exists(driverId)) {
throw new IllegalArgumentException("Driver does not exist, id = " + driverId);
}
Address from = addressRepository.getByHash(transitDetails.from.getHash());
Address to = addressRepository.getByHash(destinationAddress.getHash());
Money finalPrice = completeTransitService.completeTransit(
driverId, requestUUID, from, to); (2)
Money driverFee = driverFeeService.calculateDriverFee(finalPrice, driverId);
driverService.markNotOccupied(driverId); (2)
transitDetailsFacade.transitCompleted(requestUUID, Instant.now(clock),
finalPrice, driverFee); (2)
awardsService.registerMiles(transitDetails.client.getId(), transitDetails.transitId); (2)
invoiceGenerator.generate(finalPrice.toInt(),
transitDetails.client.getName() + " " +
transitDetails.client.getLastName()); (2)
eventsEventSender.publish(new TransitCompleted(
transitDetails.client.getId(), transitDetails.transitId,
transitDetails.from.getHash(), destinationAddress.getHash(),
transitDetails.started, Instant.now(clock), Instant.now(clock))
); (2)
}
|
There are issues with the above method.
|
Similar methods are, unfortunately, quite common in business applications. At first glance, many developers don’t see any problems with similar code. Let’s break down the problems in detail.
1.1. Overuse of transactional processing
The transactional processing has been the cornerstone of many business applications. However, in most cases, the transactional processing isn’t the best fit for real-world processes.
In our example, when the ride finishes, that’s a real-world situation.
However, the example uses the @Transactional annotation, and operate on number of unrelated data.
This means that when one of those operations fails, the whole processing will be rolled back.
In effect, the end-user will receive a nasty error message.
|
Outages from the dependent services must not invalidate the main intent. In fact, all the operations in this example could happen independently, and at different, yet reasonable times. |
1.2. Bundling of different logical domains
Our example is also very chatty, and hard to understand at first glance.
In fact, this is quite common in similar applications.
The code starts small, easy to understand.
When new features are added, it keeps growing as developers cramp new instructions into methods like completeTransit.
Of course, the developers could re-architect the code, to extract instructions to a separate blocks, but that is just a half-measure.
Still, the application will do all the operations, starting from completeTransit method in the same time, and within the same "script".
2. Refactoring plan
In this section, we’ll plan the refactoring of the Cabs application.
The refactoring will be limited to make the process easy to understand.
The scope of the refactoring will be the extraction of drivers module, which is already a separate domain in the codebase.
Within the scope of the completeTransit method, we’ll need to shift the way we calculate the fee for the driver.
The calculation will be done asynchronously, and when the driver module publishes the calculation result, it will be saved back into the database.
The base for the refactoring is the Event Mesh pattern, and the asynchronous communication will be done with Cloud Events.
After the refactoring, the completeTransit code will use two event types:
-
CalculateFee— a command event, to calculate fee for the driver for given ride -
DriverFeeCalculated— an information event, fired when the calculator does the logic to calculate the requested fee.
The diagram below shows the sequence of operations that happen when we initiate refactored completeTransit code.
The diagram illustrates the flow of events between the legacy application, the Knative Event Mesh, the fee calculator service, and the datastore.
3. Run this demonstration
Next, you can learn how to walk through this demo.
3.1. Before getting started
We’ll be using the Red Hat OpenShift Container Platform (OCP) 4.x cluster, so make sure you have it available in your environment.
You could use the Red Hat’s Developer Sandbox to spin up an instance for you.
Alternatively, you can use the OpenShift Local installation. Make sure to give it enough resources to fit the Serverless Operator and our demo application.
3.2. Installing the demo
3.2.1. Installing the Serverless Operator
To install the Serverless Operator, follow the documentation steps.
The TL;DR version would be to apply the following manifest, and wait until the operator is ready:
apiVersion: v1
kind: Namespace
metadata:
name: openshift-serverless
---
apiVersion: operators.coreos.com/v1
kind: OperatorGroup
metadata:
name: openshift-serverless
namespace: openshift-serverless
spec: {}
---
apiVersion: operators.coreos.com/v1alpha1
kind: Subscription
metadata:
name: serverless-operator
namespace: openshift-serverless
spec:
channel: stable
name: serverless-operator
source: redhat-operators
sourceNamespace: openshift-marketplace
Here are commands to apply the above manifests.
git clone https://github.com/openshift-knative/cabs-usvc
oc apply -f cabs-usvc/deploy/serverless/operator.yaml
oc wait csv/serverless-operator.v1.35.0 \
--for 'jsonpath={.status.conditions[?(@.phase == "Succeeded")]}'
|
Replace the |
Here’s the expected output
namespace/openshift-serverless created
operatorgroup.operators.coreos.com/openshift-serverless created
subscription.operators.coreos.com/serverless-operator created
clusterserviceversion.operators.coreos.com/serverless-operator.v1.35.0 condition met
3.2.2. Installing the Serving and Eventing components
To install the Serving and Eventing components, follow the Serving documentation steps and the Eventing documentation steps.
Again for TL;DR version for small, development purposes, you could apply the following manifests, and wait until the components are ready for operation:
apiVersion: v1
kind: Namespace
metadata:
name: knative-serving
---
apiVersion: operator.knative.dev/v1beta1
kind: KnativeServing
metadata:
name: knative-serving
namespace: knative-serving
spec:
high-availability:
replicas: 1
---
apiVersion: v1
kind: Namespace
metadata:
name: knative-eventing
---
apiVersion: operator.knative.dev/v1beta1
kind: KnativeEventing
metadata:
name: knative-eventing
namespace: knative-eventing
spec:
high-availability:
replicas: 1
Here are commands to apply the above manifests.
oc apply \
-f cabs-usvc/deploy/serverless/serving.yaml \
-f cabs-usvc/deploy/serverless/eventing.yaml
oc wait knativeserving/knative-serving \
--namespace knative-serving \
--for 'condition=Ready=True'
oc wait knativeeventing/knative-eventing \
--namespace knative-eventing \
--for 'condition=Ready=True'
Here’s the expected output
Warning: resource namespaces/knative-serving is missing the kubectl.kubernetes.io/last-applied-configuration annotation which is required by oc apply. oc apply should only be used on resources created declaratively by either oc create --save-config or oc apply. The missing annotation will be patched automatically.
namespace/knative-serving configured
knativeserving.operator.knative.dev/knative-serving created
Warning: resource namespaces/knative-eventing is missing the kubectl.kubernetes.io/last-applied-configuration annotation which is required by oc apply. oc apply should only be used on resources created declaratively by either oc create --save-config or oc apply. The missing annotation will be patched automatically.
namespace/knative-eventing configured
knativeeventing.operator.knative.dev/knative-eventing created
knativeserving.operator.knative.dev/knative-serving condition met
knativeeventing.operator.knative.dev/knative-eventing condition met
3.2.3. Installing the demo applications
To install the Demo application, apply the following manifests.
Here are commands to apply the above manifests.
oc create ns demo
oc apply \
-f cabs-usvc/deploy/db/redis.yaml \
-f cabs-usvc/deploy/apps/drivers.yaml \
-f cabs-usvc/deploy/apps/legacy.yaml
oc wait ksvc/drivers \
--namespace demo \
--for condition=Ready=True
oc wait ksvc/legacy \
--namespace demo \
--for condition=Ready=True
Here’s the expected output
namespace/demo created
pod/redis created
service/redis created
service.serving.knative.dev/drivers created
service.serving.knative.dev/legacy created
service.serving.knative.dev/drivers condition met
service.serving.knative.dev/legacy condition met
3.2.4. Configuring the Event Mesh
To configure the Event Mesh, apply the following manifests.
Here are commands to apply the above manifests.
oc apply \
-f cabs-usvc/deploy/mesh/broker.yaml \
-f cabs-usvc/deploy/mesh/sources.yaml \
-f cabs-usvc/deploy/mesh/triggers.yaml
oc wait broker/default \
--namespace demo \
--for condition=Ready=True
oc wait sinkbinding/drivers-binding \
--namespace demo \
--for condition=Ready=True
oc wait sinkbinding/legacy-binding \
--namespace demo \
--for condition=Ready=True
oc wait trigger/trg-drivers \
--namespace demo \
--for condition=Ready=True
oc wait trigger/trg-drivers \
--namespace demo \
--for condition=Ready=True
Here’s the expected output
broker.eventing.knative.dev/default created
sinkbinding.sources.knative.dev/drivers-binding created
sinkbinding.sources.knative.dev/legacy-binding created
trigger.eventing.knative.dev/trg-drivers created
trigger.eventing.knative.dev/trg-legacy created
broker.eventing.knative.dev/default condition met
sinkbinding.sources.knative.dev/drivers-binding condition met
sinkbinding.sources.knative.dev/legacy-binding condition met
trigger.eventing.knative.dev/trg-drivers condition met
trigger.eventing.knative.dev/trg-drivers condition met
The OpenShift Container Platform can provide a clear visualization of our deployed solution.
The console shows two sink bindings on the left, and they are feeding the events from the applications to the Broker (depicted in the center). The Broker is the centralized infrastructure piece that ensures a proper decoupling of the services. On the right, you could see the two applications deployed as Knative services, and two triggers (as lines) that configure the Event Mesh to feed appropriate events to the applications.
3.3. Walkthrough guide
With the demo pieces deployed on the cluster, we could go ahead with testing the functionality.
For the sake of brevity, the legacy application, at startup, prepares some development data in the in-memory database its running on. We will leverage that data to complete transit without the hassle of simulating the whole ride.
Because we use serverless deployments, the services could be scaled to zero.
This fact makes it a bit harder to listen to the application logs.
We recommend using stern tool to easily listen to both apps, even across scale to zero periods.
stern \
--namespace demo \
--container user-container \
'(legacy|drivers).*'
Alternatively, you can use a regular oc command and a bit of scripting:
oc logs \
--selector app=legacy \
--namespace demo \
--follow &
while [ $(oc get pod --namespace demo --selector app=drivers -o name | wc -l) -eq 0 ]; do \
sleep 1; done && oc wait pod \
--namespace demo \
--selector app=drivers \
--for condition=Ready=True && \
oc logs \
--selector app=drivers \
--namespace demo \
--follow
In the second terminal, call the legacy endpoint by sending a POST message like the following:
curl -Lk -v -X POST -H 'Content-Type: application/json' \
$(oc get ksvc legacy --namespace demo -o jsonpath='{.status.url}')/transits/8/complete \
--data-binary @- << EOF
{
"country": "Polska",
"city": "Warszawa",
"street": "Żytnia",
"buildingNumber": 32,
"hash": -580606919
}
EOF
You should observe the cURL command succeeded, and return the ride data. Moreover, the logs of both applications should be updated.
On the Legacy application you could see the log line, with shows the application is sending the Cloud Event to the Event Mesh:
INFO 1 --- [nio-8080-exec-1] i.l.cabs.common.cloudevents.Publisher :
Publishing event to http://broker-ingress.knative-eventing.svc.cluster.local/demo/default :
CloudEvent{id='83720fe5-02ee-4a3e-9b22-5c287fb68d10',source=usvc://cabs/legacy,
type='cabs.drivers.calculate-fee', datacontenttype='application/json',
subject='4e630a96-4d5c-488c-a53b-9554c0bcb97e',time=2025-02-04T17:32:20.638351262Z,
data=BytesCloudEventData{value=[123, 34, 100, 114, 105, 118, 101, 114, 45, 105,
100, 34, 58, 49, 57, 57, 51, 52, 51, 50, 53, 53, 50, 44, 34, 116, 114, 97, 110,
115, 105, 116, 45, 112, 114, 105, 99, 101, 34, 58, 53, 49, 48, 48, 125]},
extensions={}}
You can notice the cabs.drivers.calculate-fee event was later routed to the Drivers service, which calculated the fee.
After the fee was calculated, the cabs.drivers.driver-fee event was published back into the Event Mesh.
[INFO drivers::app::events] Received event:
CloudEvent:
specversion: '1.0'
id: 'f94792bc-9c38-4db1-8da6-b6a28d1b4847'
type: 'cabs.drivers.calculate-fee'
source: 'usvc://cabs/legacy'
datacontenttype: 'application/json'
subject: '005be37e-8971-4a5b-b5e7-dd18de3c1184'
time: '2025-02-04T17:48:11.641317948+00:00'
knativearrivaltime: '2025-02-04T17:48:11.655926003Z'
Binary data: "{\"driver-id\":1993432552,\"transit-price\":5100}"
[DEBUG drivers::drivers::service] calculate fee for: Subject {
id: Some("005be37e-8971-4a5b-b5e7-dd18de3c1184"),
entity: CalculateFeeEvent {
driver_id: Identifier(1993432552),
transit_price: Money(5100) } }
[DEBUG drivers::drivers::service] fee value: Money(4856)
[DEBUG drivers::support::cloudevents] sending cabs.drivers.driver-fee event to
http://broker-ingress.knative-eventing.svc.cluster.local/demo/default:
Event { attributes: V10(Attributes { id: "939babd7-6a85-4859-b45b-66087aba9418",
ty: "cabs.drivers.driver-fee", source: "usvc://cabs/drivers",
datacontenttype: Some("application/json"), dataschema: None,
subject: Some("005be37e-8971-4a5b-b5e7-dd18de3c1184"),
time: Some(2025-02-04T17:48:12.897943139Z) }),
data: Some(Json(Object {"driver-id": Number(1993432552), "fee": Number(4856)})),
extensions: {} }
In the end, the cabs.drivers.driver-fee event was routed to the Legacy application, by Event Mesh.
You could see the evidence of it in the logs.
INFO 1 --- [nio-8080-exec-2] i.l.c.ride.details.TransitDetailsFacade : Driver fee calculated for transit 005be37e-8971-4a5b-b5e7-dd18de3c1184: 48.56
4. In-depth look at the refactoring
In this section, we’ll refactor the Cabs application.
The refactoring will be limited to make the process easy to understand.
The scope of the refactoring will be the extraction of drivers module, which is already a separate domain in the codebase.
Within the scope of the completeTransit method, we’ll need to shift the way we calculate the fee for the driver.
The calculation will be done asynchronously, and when the driver module publishes the calculation result, it will be saved back into the database.
The base for the refactoring is the Event Mesh pattern, and the asynchronous communication will be done with Cloud Events.
4.1. Drivers module
The functionality around drivers is already quite separated in the codebase, so it is a good staring point to extract into a separate module. The drivers module will become a standalone web service, deployed on the Kubernetes cluster. The implementation of the drivers module will be done with Rust for this example.
Here’s the Rust code for calculate fee functionality.
The entrypoint is the Cloud Event of type cabs.drivers.calculate-fee we are expecting the Event Mesh will route.
impl Service {
pub async fn calculate_fee(&mut self, ce: Event) -> Result<()> {
let calc_fee_intent = Self::unwrap_calculatefee(ce)?; (1)
let subject = calc_fee_intent.id.clone();
log::debug!("calculate fee for: {:?}", calc_fee_intent);
let drv = self.repo.get(&calc_fee_intent.entity.driver_id).await?;
let fee = drv.calculate_fee(&calc_fee_intent.entity.transit_price); (2)
log::debug!("fee value: {:?}", fee);
let driverfee_event = DriverFeeEvent {
driver_id: calc_fee_intent.entity.driver_id,
fee,
}; (3)
let mut builder = driverfee_event.to_builder(); (3)
if let Some(id) = subject {
builder = builder.subject(id);
} (3)
let ce = builder.build().map_err(error::ErrorInternalServerError)?; (3)
Sender::new(&self.config).send(ce).await?; (4)
Ok(())
}
// [..]
}
In the above code, we are doing the following:
| 1 | We are unwrapping Cloud Event envelope into an internal, domain, fee value object. |
| 2 | We are calculating the fee value using some domain logic. |
| 3 | We are wrapping the calculated fee value into a new Cloud Event. |
| 4 | We are sending the fee, as Cloud Event, back to the Event Mesh using HTTP REST client. |
Of course, in order for this method to be called, we need to route the event from the HTTP listener:
pub fn routes() -> impl HttpServiceFactory + 'static {
web::resource("/").route(web::post().to(recv))
}
async fn recv(
ce: Event,
state: web::Data<State>,
binding: web::Data<Binding>,
) -> Result<HttpResponse> {
log::info!("Received event:\n{}", ce);
let mut svc = service::new(state, binding).await?;
match ce.ty() {
"cabs.drivers.calculate-fee" => svc.calculate_fee(ce).await,
_ => Err(error::ErrorBadRequest("unsupported event type")),
}?;
Ok(HttpResponse::Ok().finish())
}
|
The example above uses a simple switch statement to determine the route for the given type of the event. In a real application, you would probably use a more complex logic to determine which method should be called. |
Let’s see also the Cloud Event sender, that uses the HTTP REST client to send events to the Event Mesh:
impl Sender {
pub async fn send(&self, ce: Event) -> Result<()> {
log::debug!("sending {} event to {}:\n{:?}", ce.ty(), &self.sink, ce,);
let response = self
.client
.post(&self.sink) (1)
.event(ce)
.map_err(error::ErrorInternalServerError)?
.send()
.await
.map_err(error::ErrorInternalServerError)?;
match response.status().is_success() {
true => Ok(()),
false => {
log::error!("failed to send event: {:#?}", response);
Err(error::ErrorInternalServerError(format!(
"failed to send event: {}",
response.status()
)))
}
}
}
}
| 1 | The client uses POST method, to send the JSON representation of the event to the sink. The sink is the URL of the target, in this case the url of the Event Mesh. |
4.2. Event Mesh
In this section, we’ll use the Event Mesh setup to communication between the extracted Drivers module and the different parts of the application.
Here’s the configuration of the Event Mesh's central component, the Broker, which will be used in this example. The Broker here is the Knative component, and will be deployed in the Kubernetes cluster.
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: default
namespace: demo
spec:
delivery:
backoffDelay: PT0.2S (1)
backoffPolicy: exponential (2)
retry: 10 (3)
| 1 | The backoffDelay is the delay between retries, and we use 200ms in this example. |
| 2 | The backoffPolicy is set to exponential, which means that the delay will be doubled each time. |
| 3 | The retry is the number of times we retry before giving up. |
|
In our example, the policy is |
|
A |
4.3. Legacy application changes
The last part of the refactoring will be the changes needed in our legacy Java application. We need to remove the Drivers logic and send events to the Event Mesh instead. We also need to accept new events coming from the Event Mesh, as the calculated fee will be transmitted as such.
Here’s the refactored code:
public void completeTransit(UUID requestUUID, AddressDTO destinationAddress) {
// ...
Money finalPrice = completeTransitService.completeTransit(driverId, requestUUID, from, to);
// ...
driverFeeService.calculateDriverFee(requestUUID, finalPrice, driverId); (1)
// ...
}
@EventListener (2)
public void driverFeeCalculated(DriverFee driverFee) { (3)
Objects.requireNonNull(driverFee.ctx.getSubject());
UUID id = UUID.fromString(driverFee.ctx.getSubject());
transitDetailsFacade.driverFeeCalculated(id, driverFee.data.fee);
}
|
To communicate with the Event Mesh, we need to add a new Cloud Event sender and listener. That’s being done similarly, as in the case of Rust application.
Below, you can see how you may implement the Cloud Event sender:
@Service
public class DriverFeeService {
private final CloudEventSender eventSender;
@Autowired
public DriverFeeService(EventSender eventSender) {
this.eventSender = eventSender;
}
public void calculateDriverFee(UUID rideId, Money transitPrice, Long driverId) {
eventSender.send(new CalculateFee(
rideId,
driverId,
transitPrice.toInt()
));
}
}
@Service
public class CloudEventSender {
private static final Logger log = LoggerFactory.getLogger(EventSender.class);
private final KnativeConfig knative;
private final List<Into<?>> converters;
@Autowired
CloudEventSender(KnativeConfig knative, List<Into<?>> converters) {
this.knative = knative;
this.converters = converters;
}
public void send(Object event) {
try {
unsafeSend(event);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private <T> void unsafeSend(T event) throws IOException {
Into<T> convert = (Into<T>) converters.stream()
.filter(c -> c.accepts(event))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException(
"Cannot find converter for " + event.getClass()));
CloudEvent ce = convert.into(event);
URL url = knative.getSink();
log.info("Publishing event to {} : {}", url, ce);
HttpURLConnection http = (HttpURLConnection) url.openConnection();
http.setRequestMethod("POST");
http.setDoOutput(true);
http.setDoInput(true);
HttpMessageWriter messageWriter = createMessageWriter(http);
messageWriter.writeBinary(ce);
int code = http.getResponseCode();
if (code < 200 || code >= 300) {
throw new IOException("Unexpected response code " + code);
}
}
}
Once again, notice this is just a simple HTTP client doing the POST request, with the body being the JSON representation of the CloudEvent.
The last part to see is the HTTP listener on the legacy application side. This listener will be responsible for receiving events from Knative’s Event Mesh and converting them into our custom event type:
@RestController
public class CloudEventReceiver {
private static final Logger log = LoggerFactory.getLogger(Receiver.class);
private final EventsPublisher eventsPublisher;
private final List<From<?>> froms;
@Autowired
Receiver(EventsPublisher eventsPublisher, List<From<?>> froms) {
this.eventsPublisher = eventsPublisher;
this.froms = froms;
}
@PostMapping("/")
public void receive(@RequestBody CloudEvent event) {
log.info("Received event: {}", event);
for (From<?> from : froms) {
if (from.matches(event)) {
Event ev = from.fromCloudEvent(event); (1)
eventsPublisher.publish(ev); (2)
return;
}
}
throw new IllegalStateException("No matching event type consumer found");
}
}
| 1 | We unwrap the CloudEvent into our domain event type (in the example that’s the DriverFeeCalculated type) |
| 2 | And publish it withing the application, using the framework’s EventsPublisher implementation.
The domain events will be transmitted to the methods annotated with @EventListener. |
|
Don’t confuse the framework’s EventsPublisher with Cloud Event sender and receiver. |
4.4. The wiring of our Event Mesh
To complete the solution, we need to configure the Event Mesh. The configuration describes the rules for receiving and sending events from and to the Event Mesh and the application modules.
Here are the sources in our case:
apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
name: drivers-binding
namespace: demo
spec:
sink:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: default
namespace: demo
subject:
apiVersion: serving.knative.dev/v1
kind: Service
name: drivers
namespace: demo
---
apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
name: legacy-binding
namespace: demo
spec:
sink:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: default
namespace: demo
subject:
apiVersion: serving.knative.dev/v1
kind: Service
name: legacy
namespace: demo
We are using the SinkBinding resource to bind an event source (the Service) with an event sink (Broker). We have two applications that will feed their events into the Event Mesh, so we need two SinkBinding resources.
Lastly, we have to configure the Broker to send events from the Event Mesh to the expected application modules. We use the Trigger resource for this purpose.
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: trg-drivers
namespace: demo
spec:
broker: default
filter:
attributes:
type: cabs.drivers.calculate-fee (1)
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: drivers
namespace: demo
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: trg-legacy
namespace: demo
spec:
broker: default
filter:
attributes:
type: cabs.drivers.driver-fee (1)
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: legacy
namespace: demo
| 1 | Note, we specify the type of the event, as a filter. |
5. Conclusion
Let’s step back and take a look at what we have accomplished.
The refactored application code fragment is now distributed, resilient, and eventually consistent. It will gracefully handle the failures that may happen while calculating the driver’s fee. The Event Mesh will make sure to retry the event delivery, in case of failures on either side.
We could extend the refactoring, even further, with the same principle, making the whole application modern, responsible, and without incorrect, unnecessary, transactional behavior.