Thursday, October 29, 2015

Exploring CQRS with Axon Framework: Applying Event Sourcing

This post is part of the series of post on exploring CQRS architecture using Axon Framework. It looks into combining event sourcing within CQRS using Axon Framework.

The previous posts in this series has explored the various components of a CQRS application and in Introducing Domain Events and Event Handling, we are able to put everything together. We did all that without bringing in event sourcing, showing that it is possible to have a CQRS architecture easily without introducing event sourcing. But in this post, we do just that: introduce event sourcing.

There is a project on Github (exploringCQRSwithAxon) that accompanies these series of posts. It is a trivial application where we can perform debiting, crediting and transferring of money between two dummy accounts. The code in the project is meant to help illustrate the application of the Axon framework to building CQRS application.


Following with the sample application
To have the sample application in the state where the content of this post applies to it:
first clone the repo:

git clone git@github.com:dadepo/exploringCQRSwithAxon.git

then check out to the appropriate commit hash

git checkout fef500cab7b6129d2e12c14b07d28265aa99f219

The application is built using Axon Framework with Spring Boot. In other to run it, just change to the root directory and execute mvn spring boot:run

The Objective

Our objective in this post is to modify the exploringCQRSwithAxon application and introduce event sourcing.

But what is Event Sourcing?

Well I must confess the reason why the accompanying application to these series of posts, makes use of a bank account metaphor is due to the fact that bank account transactions has been one of the most effective examples I have seen that can easily be used to explain event sourcing…

So to explain event sourcing, imagine a bank account. The current state of a bank account is most likely not going to be a numeric value that is saved in a database somewhere, instead, it will most likely be a value that is arrived at, by computing all the mutation: crediting and debiting that has occurred on the account.

And that is what Event Sourcing is. It is the approach of saving all the actions that changes the state of a domain model, and using these saved changes to reconstruct or arrive at the current state of the model when needed.

So with Event Sourcing we do not “Store” the current state of the System, rather we store all the events that changes the system. And when we need to get the current state, we retrieve the initial state of the system and apply all the saved events that has had any effect on the system.

Axon framework comes with the necessary infrastructure components that allows easy introduction of event sourcing into an application.

How do we go about doing this?

In summary, the way we would introduce event sourcing involves making use of a Repository that does not store the current state of domain objects but stores the domain events that emanates from them. These kinds of Repositories know how to construct the current state of the aggregate from the stored events, so we do not need to manually apply these events whenever we want to retrieve the aggregate for use.

The Axon Framework provides such repository implementation.

That is the general gist, let us now take a look at the code changes that introduced Event Sourcing into our application:

Configuring the Event Store
Since our repository we will be storing domain events instead of the current state of our domain objects, we will need to provide it with an event store. The EventStore interface defines the event storage mechanism. The Axon Framework provides a couple of implementation of the EventStore interface depending on how the events are to be stored: from implementations that allows the storing of events in the file system, in MongoDB, in Cassandra, or to implementations that allows storing events in a relational database using JPA, or JDBC etc.

In this post we would be using the FileSystemEventStore, and this we, configured as a Spring bean thus:

/**
* An event sourcing implementation needs a place to store events. i.e. 
* The event Store.
* In our use case we will be storing our events in a file system, so we 
* configure
* the FileSystemEventStore as our EventStore implementation
*
* It should be noted that Axon allows storing the events
* in other persistent mechanism...jdbc, jpa etc
*
* @return the {@link EventStore}
*/
@Bean
public EventStore eventStore() {
 EventStore eventStore = 
   new FileSystemEventStore(new SimpleEventFileResolver(new File("./events")));
  return eventStore;
}

As can be seen from the above, we are using SimpleEventFileResolver to configure the location where the event stream are to be stored.

Configure the EventSourcingRepository
Since we are no longer storing the current state, we removed the GenericJpaRepository and all inclusion of the Autowiring of the EntityManager.

You will also notice that the spring-boot-starter-data-jpa dependency has been replaced with spring-boot-starter-jdbc.

We then configured an EventSourcingRepository, which can retrieve, and persist entities from event stream saved in an event source. The configuration is below:

/**
* Our aggregate root is now created from stream of events and not 
* from a representation in a persistent mechanism,
* thus we need a repository that can handle the retrieving of our 
* aggregate root from the stream of events.
*
* We configure the EventSourcingRepository which does exactly this. 
* We supply it with the event store

* @return {@link EventSourcingRepository}
*/
@Bean
public EventSourcingRepository eventSourcingRepository() {
  EventSourcingRepository eventSourcingRepository = 
        new EventSourcingRepository(Account.class, eventStore());
  eventSourcingRepository.setEventBus(eventBus());
  return eventSourcingRepository;
}

Notice we provided it with the EventStore we configured earlier. Also we still configured the repository with the eventbus since it will still be publishing domain events.

The Db#init() method is also updated to always delete the generated events in the event store on application startup. This is to ensure we always have a clean slate when we start the app.

Changes to Account
We then update our Aggregate Root. We removed all the JPA related annotations (@Entity, @Id, @Column) and we introduce the @AggregateIdentifier.

The @AggregateIdentifier plays a role similar to JPA’s @Id, it is used to mark the field that is used to maintain our entities identity. Next we made Account extend AbstractAnnotatedAggregateRoot instead of AbstractAggregateRoot

We had to replace the class our aggregate root extend with AbstractAnnotatedAggregateRoot because AbstractAggregateRoot does not support event sourcing.

We then introduce the apply() method. The apply method is similar in functionality to the registerEvent() method but in the use case of event sourcing the apply() method not only generate the domain events, it also makes sure it is stored in the backing event store.

The debit method now looks thus:

public void debit(Double debitAmount) {

 if (Double.compare(debitAmount, 0.0d) > 0 &&
       this.balance - debitAmount > -1) {
   /**
    * Instead of changing the state directly we apply an event
    * that conveys what happened.
    *
    * The event thus applied is stored.
    */
   apply(new AccountDebitedEvent(this.accountNo, debitAmount, this.balance));
  } else {
   throw new IllegalArgumentException("Cannot debit with the amount");
  }
}

and the credit method looks thus:

public void credit(Double creditAmount) {
 
 if (Double.compare(creditAmount, 0.0d) > 0 &&
       Double.compare(creditAmount, 1000000) < 0) {
  /**
   * Instead of changing the state directly we apply an event
   * that conveys what happened.
   *
   * The event thus applied is stored.
   */
   apply(new AccountCreditedEvent(this.accountNo, creditAmount, this.balance));
 } else {
   throw new IllegalArgumentException("Cannot credit with the amount");
 }
}

We have also introduced two new events: AccountCreditedEvent and AccountDebitedEvent.

And as can be seen, instead of changing the state, we "apply events". These events are the ones that will be event sourced.

We then introduced two new methods, which are handlers to the new events we introduced:

@EventSourcingHandler
private void applyDebit(AccountDebitedEvent event) {
 /**
  * This method is called as a reflection of applying events stored in 
  * the event store.
  * Consequent application of all the events in the event store will 
  * bring the Account
  * to the most recent state.
  */

  this.balance -= event.getAmountDebited();
}

and

@EventSourcingHandler
private void applyCredit(AccountCreditedEvent event) {
 /**
  * This method is called as a reflection of applying events stored 
  * in the event store.
  * Consequent application of all the events in the event store will 
  * bring the Account
  * to the most recent state.
  */
 this.balance += event.getAmountCredited();
}

and it is in these methods that the actual state changes is done. Note that these methods are annotated with @EventSourcingHandler. Which is slightly different from @EventSourcing we have been introduced to in previous posts in the sense that the @EventSourcingHandler is meant to mark methods that react to changes within an aggregate.

Before we go forward let us take another look again at the changes thus far and perhaps explain all the moving parts we just introduced?

So we started by removing the business logic that actually changes state from the debit() and credit() methods. Instead we introduce the usage of apply() method which we used to apply the AccountCreditedEvent and AccountDebitedEvent, technically publishing these events as domain events and storing them as part of event sourcing.

Although we are no longer changing states in the debit/credit method, we still need to apply the state changing logic. To do this, we added two new methods which we annotated with @EventSourcingHandler. We moved the state changing logic there instead.

The @EventSourcingHandler marks a method as one that contains logic to be applied to an aggregate based on domain events generated by that aggregate.

When an event sourced aggregate is being built to it’s current states, all the events stored by using the apply() method is replayed and the methods annotated by @EventSourcingHandler are executed. This is how the current state is achieved.

We then modify the non default constructor. We apply a new event AccountCreditedEvent.

public Account(String accountNo) {
  apply(new AccountCreatedEvent(accountNo));
}

with it’s accompanying @EventSourcingHandler method

@EventSourcingHandler
public void applyAccountCreation(AccountCreatedEvent event) {
  this.accountNo = event.getAccountNo();
  this.balance = 0.0d;
}

Updates to the Event Handling Logic
With the above modifications, we no longer store the current state, we store events. Because of this we also need to update the event handling logic to be able to make use of the domain events published when accounts are debited/credited in other to be able to accurately reflect the current state of the system which the view layer will query.

The AccountCreditedEventHandler now look thus:

@Component
public class AccountCreditedEventHandler {

 @Autowired
 DataSource dataSource;

 @EventHandler
 public void handleAccountCreditedEvent(AccountCreditedEvent event, 
                   Message eventMessage, @Timestamp DateTime moment) {
    JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);

    // Get the current states as reflected in the event
    String accountNo = event.getAccountNo();
    Double balance = event.getBalance();
    Double amountCredited = event.getAmountCredited();
    Double newBalance = balance + amountCredited;

    // Update the view
    String updateQuery = "UPDATE account_view SET balance = ? 
                                                  WHERE account_no = ?";
    jdbcTemplate.update(updateQuery, new Object[]{newBalance, accountNo});

    System.out.println("Events Handled With EventMessage " + 
                      eventMessage.toString() + " at " + moment.toString());
  }

}

the AccountDebitedEventHandler now look thus:

@Component
public class AccountDebitedEventHandler {

  @Autowired
  DataSource dataSource;

  @EventHandler
  public void handleAccountDebitedEvent(AccountDebitedEvent event) {
      JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);

      // Get the current states as reflected in the event
      String accountNo = event.getAccountNo();
      Double balance = event.getBalance();
      Double amountDebited = event.getAmountDebited();
      Double newBalance = balance - amountDebited;

      // Update the view
      String updateQuery = "UPDATE account_view SET balance = ? 
                                                 WHERE account_no = ?";
      jdbcTemplate.update(updateQuery, new Object[]{newBalance, accountNo});
  }
}

We are using the balance at the points the events were applied with the amount that lead to the event to calculate the current state of the accounts.

And with these changes we complete the introduction of event sourcing into the system.

If you start the application now, you should still see the same index page and still be able to credit/debit any of the two accounts and have their current state reflected.

So far so good we have been able to assemble a CQRS architecture. But we have left out a crucial part of any software application, and that is testing.

The next post: Exploring CQRS with Axon Framework: Overview of the Testing infrastructure looks into the testing infrastructures Axon provides.

No comments: