Wednesday, August 24, 2016

Replaying Events in An Axon Framework Based Application

This post, shows how to replay events from the event store, in an event sourced application built with Axon Framework.

Note: This post applies only to 2.x versions of the Axon Framework, as the implementation of the event processing aspects of the Axon changes significantly with the version 3 release.

If you are not familiar with CQRS and/or the Axon Framework, then consider starting off by reading Exploring CQRS Architecture with Axon Framework, which is a series of post that explores the CQRS architecture/pattern and how the Axon Framework can be used to build applications according to the stipulations of CQRS.

If you’ve read these series of posts, or you are already familiar with CQRS/Axon Framework, then feel free to continue with the rest of this post.


Why Replay Events

The simple idea behind event sourcing is this: do not store the current state of things in an application, but store the various changes (usually referred to as events) within the application and use the summation of these changes/events to always arrive at the current state.

An application built following this approach would make use of an event store where the events generated within the application are stored.

This event store can be used to regenerate the current state as already mentioned, but the events in the event store can also be replayed independently without using them to generate the current state.

This ability to replay events gives tremendous superpowers, not just from a technical perspective, but also from the business side of things.

For instance, it makes business optimisation a tad easier (since it makes it convenient to see all the various stages (represented as events) a user passes through while interacting with an application).

Auditing is also a lot easier, since the event stream in principle becomes the audit trail.

It also makes it easy to generate new view models: what is sometimes called projections and apply them retrospectively.

For instance, let say we have two e-commerce sites. One is implemented without event sourcing, so it just stores the amount of merchandise (the current state we are interested in) a customer buys. The second e-commerce site, on the other hand, is implemented with event sourcing, so instead of storing the final amount of merchandise, it stores relevant events within the application that leads to the final purchase. Events like: UserLoggedIn, UserViewedProductX, UserAddedProductXtoCart, UserRemovedProductXfromCart, UserCheckedOut etc.

Now, let us imagine that after these two applications have been in production for a couple of months, business decides it would like to know the rate of removal of products from the shopping cart before final checkout.

With the e-commerce site built without event sourcing, new code would have to be written to capture the amount of products that gets removed from the shopping cart prior to checkout. Thus, the data about products removal from the shopping cart can only be made available after the deployment of the new code. All the removal of products from the shopping carts, that has occurred since the application has been in production is in effect, lost.

With the e-commerce site built with event sourcing, things are a little bit different. New code would still have to be written, but this time, the new code won’t be for capturing the information afresh. Instead, the new code would be used to generate a new view model by replaying all the events from the event store and aggregate the amount of UseRemovedProductXfromCart events that has occurred. The cool thing is that this approach would be able to show the rate of removal of products from the shopping cart retrospectively, that is, from the very day the application was deployed to production.

That is just one of the various benefits of an event sourced system.

But to be able to harness these benefits, you would need a mechanism to be able to replay all the events, and register the necessary event handlers needed to listen to relevant events that is needed to create the new required view.

The Axon Framework got you covered when it comes to this, and this post shows how to replay events from the event store in an application built with the Axon Framework.

To show this, I am going to build on the exploringCQRSwithAxon project that was introduced as part of the Exploring CQRS Architecture with Axon Framework series.

The exploringCQRSwithAxon project already uses event sourcing. The rest of this post will show the relevant code changes that was made in other to introduce the ability to have events replayed.

If you are interested in going over how event sourcing was introduced to the application in the first place, feel free to check the post: Exploring CQRS with Axon Framework: Applying Event Sourcing

Achieving Event Replay: Overview of what is needed.

Before we look at the code changes, it would be a good idea to first have a mental model of what needs to be achieved.

In principle, what we need to achieve is as follows: introduce the ability to take all events from the event store, run them, sequentially unto components that are interested in the events.

That means, we need a mechanism to describe and select the components that should be interested in the replayed events from the event store. We also need a mechanism to actually replay the events.

When events are replayed, we also need to make sure that they do not get pushed to the event bus, as that would lead to event handlers, handling events that have already been handled in the past.

The code changes needed to do just that, is as follows:

Switch the EventStore implementation from FileSystem to JDBC

We need an implementation of the event store that supports replaying of events. As far as I can tell, the FileSystemEventStore implementation which was used previously does not. Thus the first thing we do is to replace the FileSystemEventStore with a JdbcEventStore. Thus our events would now be stored in a database instead of in a file.

@Bean
public DataSource dataSource() {
   return DataSourceBuilder
           .create()
           .username("sa")
           .password("")
           .url("jdbc:h2:mem:exploredb")
           .driverClassName("org.h2.Driver")
           .build();
}

/**
* An event sourcing implementation needs a place to 
* store events. i.e. The event Store.
* In our use case we will be storing our events in 
* database, so we configure the JdbcEventStore as 
* our EventStore implementation. It should be noted 
* that Axon allows storing the events in other 
* persistent mechanism...jdbc, jpa, filesystem etc
*
* @return the {@link EventStore}
*/
@Bean
public EventStore jdbcEventStore() {
   return new JdbcEventStore(dataSource());
}

Introduce the clusters


In the Axon framework, a cluster, in the context of event processing, is the facility that allows the grouping together of event handlers and have them treated as a single unit.

A single cluster, which can contain as many event handlers as needed, can then be configured separately and be made to behave differently from the other clusters of event handlers that might be configured in the application.

For our code changes, we would need two separate clusters: One for event handlers that should respond to normal domain events, the other for replayed events:


/**
*  A cluster which can be used to "cluster" together event handlers.
* This implementation is based on
* {@link SimpleCluster} and it would be used to cluster event 
* handlers that would listen to events thrown
* normally within the application.
*
* @return an instance of {@link SimpleCluster}
*/
@Bean
public Cluster normalCluster() {
   SimpleCluster simpleCluster = new SimpleCluster("simpleCluster");
   return simpleCluster;
}

/**
*  A cluster which can be used to "cluster" together event handlers. 
* This implementation is based on
* {@link SimpleCluster} and it would be used to cluster event handlers 
* that would listen to replayed events.
*
* As can be seen, the bean is just a simple implementation of 
* {@link SimpleCluster} there is nothing about
* it that says it would be able to handle replayed events. 
* The bean definition #replayCluster is what makes
* this bean able to handle replayed events.
*
* @return an instance of {@link SimpleCluster}
*/
@Bean
public Cluster replay() {
   SimpleCluster simpleCluster = new SimpleCluster("replayCluster");
   return simpleCluster;
}

Configure cluster meant for replay to handle events replayed from event store

This involves taking the replay() cluster defined above, and wrap it with a ReplayingCluster. This effectively makes it possible for the replay() cluster to be used during replay of events.

/**
* Takes the #replay() cluster and wraps it with a Replaying Cluster, 
* turning the event handlers that are registered
* to be able to pick up events when events are replayed.
*
* @return an instance of {@link ReplayingCluster}
*/
@Bean
public ReplayingCluster replayCluster() {
   IncomingMessageHandler incomingMessageHandler = 
                         new DiscardingIncomingMessageHandler();
   EventStoreManagement eventStore = (EventStoreManagement) jdbcEventStore();
   return new ReplayingCluster(replay(), eventStore, 
                          new NoTransactionManager(),0,incomingMessageHandler);
}

As can be seen above, the ReplayingCluster does requires a couple of other components: a transaction manager, event store etc. But maybe, of particular interest is the Incoming Message handler component.

The Incoming Message handler dictates how the cluster should handle events while a replay is in progress. i.e. should new events be discarded? Or the events should be put in a "backlog" so they can then be processed sequentially after the replay is finished.

For putting the events in a backlog, Axon has an implementation of the IncomingMessageHandler for this: namely BackloggingIncomingMessageHandler. If we want to discard events while in replay mode, then we have DiscardingIncomingMessageHandler which is the implementation we used above.

Note that, in most practical cases, it is very unlikely that the replay() bean be used as a standalone component. Since it's only reason of existence is to be used to create a ReplayingCluster. Thus it is possible to not define the replay() bean, but pass in an instance of the SimpleCluster when defining the ReplayingCluster bean. That is:

....
return new ReplayingCluster(new SimpleCluster("replayCluster"), eventStore,
                          new NoTransactionManager(),0,incomingMessageHandler);
....

Configure a cluster selector

So far so good, we have defined two clusters and enabled one to be usable when events are replayed (what we did by wrapping it with a ReplayingCluster). The next thing we need is a cluster selector.

The cluster selector is in essence what we use to select and group a bunch of event listeners to clusters.

In our case, we group our normal event handlers to the normalCluster defined above, and group the event handlers that would handle events during replay to the replayCluster.

/**
* This configuration registers event handlers with the two defined clusters
*
* @return an instance of {@link ClusterSelector}
*/
@Bean
public ClusterSelector clusterSelector() {
   Map clusterMap = new HashMap<>();
   clusterMap.put("exploringaxon.eventhandler", normalCluster());
   clusterMap.put("exploringaxon.replay", replayCluster());
   return new ClassNamePrefixClusterSelector(clusterMap);
}

We are using the package name to select which event handlers would be assigned to clusters.

Axon provides two other mechanism of achieving the same thing. Use of patterns in the class name to select which event handlers are assigned to clusters, and the use of Annotations. Both functionality are implemented via the ClassNamePatternClusterSelector and AnnotationClusterSelector class respectively.

Define the Terminal

So now we have our clusters. Using the ClusterSelector we have also been able to group our event handlers into the two clusters we have. Next thing is to define a terminal, which is needed by the EventBus to be able to route events to specific clusters.

The following configuration introduces the terminal:

/**
* An {@link EventBusTerminal} which publishes application domain events 
* onto the normal cluster
*
* @return an instance of {@link EventBusTerminal}
*/
@Bean
public EventBusTerminal terminal() {
   return new EventBusTerminal() {
       @Override
       public void publish(EventMessage... events) {
           normalCluster().publish(events);
       }
       @Override
       public void onClusterCreated(Cluster cluster) {

       }
   };
}


Swap out the Normal Event bus with a Clustering Event Bus

Since we would like our event bus to be more sophisticated: to have the ability to route events to event handlers based on which cluster they belong to, we would need to replace our SimpleEventBus with a ClusteringEventBus. This we do with the configuration below. Note the ClusteringEventBus is passed in the clusterSelector and terminal.

/**
* This replaces the simple event bus that was initially used. 
* The clustering event bus is needed to be able
* to route events to event handlers in the clusters. 
* It is configured with a {@link EventBusTerminal} defined
* by #terminal(). The EventBusTerminal contains the 
* configuration rules which determines which cluster gets an incoming event
*
* @return a {@link ClusteringEventBus} implementation of {@link EventBus}
*/
@Bean
public EventBus clusteringEventBus() {
   ClusteringEventBus clusteringEventBus = 
        new ClusteringEventBus(clusterSelector(), terminal());
   return clusteringEventBus;
}

Before going further with the other code changes, let us take a step back to examine the configuration of the terminal and eventBus above.

For the terminal we had:

public void publish(EventMessage... events) {
        normalCluster().publish(events);
    }

This means the terminal will only route events to the normalCluster when the application is running and the replayCluster will never receive any events that emanate during the normal course of the application in production.

Thus, to keep the view that will be generated based on the replayed events up to date, all the events would have to be replayed again, (so as to pick up the new events that has since been generated since the last replay from the event store).

If your event store does not contain that many events, say it contains only a couple hundred thousands, then this approach would be a practical one. But if you dealing with an high volume application with events in the event store running into the millions, then having to replay all the events from the event store each time to refresh the generated view won’t be practical.

In such a scenario what you could do is to have the initial replay of events at application startup (or at first request of the view) and have the the event handlers that gets the event replayed from the event store to also receive the normal domain events that gets published during the normal running course of the application. These way the view can be kept up to date.

In such a scenario we wont need a custom Terminal when configuring the event bus, and the ClusteringEventBus can just be configured using only the clusterSelector:

new ClusteringEventBus(clusterSelector());

Moving ahead, we take a look at the last piece of configuration needed to replay events. Which is where we set our EventSourcingRepository to make use of the new event bus:

@Bean
public Repository<Account> eventSourcingRepository() {
   EventSourcingRepository eventSourcingRepository = 
             new EventSourcingRepository(Account.class, jdbcEventStore());
   // using the new event bus
   eventSourcingRepository.setEventBus(clusteringEventBus());
   return eventSourcingRepository;
}

With this, we have the basic configuration needed to replay events. The rest of the code changes introduced the new event handler to listen to replay events and a new page to view the replayed events.

Registering EventHandler for replayed events

The following class AccountCreditedReplayEventHandler was Introduced:

@Component
public class AccountCreditedReplayEventHandler implements ReplayAware {

List audit = new ArrayList<>();

@EventHandler
public void handle(AccountCreditedEvent event) {
String auditMsg =
String.format("%s credited to account with account no {%s} on %s",

event.getAmountCredited(), event.getAccountNo(),
formatTimestampToString(event.getTimeStamp()));
audit.add(auditMsg);
}

@EventHandler
public void handle(AccountDebitedEvent event) {
String auditMsg =
String.format("%s debited from account with account no {%s} on %s",

event.getAmountDebited(), event.getAccountNo(),
formatTimestampToString(event.getTimeStamp()));
audit.add(auditMsg);
}

public List getAudit() {
return audit;
}

@Override
public void beforeReplay() {
audit.clear();
}

@Override
public void afterReplay() {
}

@Override
public void onReplayFailed(Throwable cause) {}

private String formatTimestampToString(long timestamp) {
return new SimpleDateFormat("dd/MM/yyyy HH:mm:ss")
.format(timestamp * 1000);
}
}


An interesting thing to note about this class is that it implements the "ReplayAware" interface. Which is required by any components that needs to be “aware” of the start and end of event replaying. We have the AccountCreditedReplayEventHandler implement the ReplayAware Interface as we would like to clear the view (which is represented via the audit list) before the replay of events.

Not so surprisingly you would also notice that it has two event handlers. These two event handlers listen to the AccountCreditedEvent and AccountDebitedEvent events. This is where the logic needed to interpret past events in other to build a new view is implemented.

If you clone the project and run it and play around with adding and crediting the two dummy accounts, when you now navigate to http://localhost:8080/events, you would see something similar to this:


A view showing us all the credited and debited events that has taken place and when they occurred

To see all the code changes in one go that was introduced in other to introduce the ability to replay events, then view the adecd71ca5d8c9542d26ffe82231d5c27d93bfa9 commit.

That is it. If you introduce similar changes to your Axon framework based application, tweaking the configuration where necessary, you should be able to have events replayed from your event store.


2 comments:

gulshan said...

How to start the replaying of events?

Anonymous said...

Very nice example..

Is there complete example which includes replaying events and rebuilding projections for Axon 3.X series ?