Saturday, October 17, 2015

Exploring CQRS with Axon Framework: Introducing Domain Events and Event Handling

This is a post in a series of posts that explores the CQRS architecture with Axon Framework. It is recommended to read the previous posts in this series before going ahead with this one, as that would help in forming a thread of continuity around the topic being discussed. You can start from Exploring CQRS Architecture with Axon Framework: Introduction

There is a Github project (exploringCQRSwithAxon) that contains a simple application that accompanies the posts in this series. it is an illustrative application that allows debiting, crediting and transferring money between two fictitious accounts. It is obvious a trivial application, which is on purpose. It’s aim is not to capture any complex domain but to help illustrates the various components of a CQRS architecture and how the Axon Framework can be used to build these various components.


Following with the sample application.
To have the project in the state that illustrates the subject of matter in this post:

first clone the repo:

git clone git@github.com:dadepo/exploringCQRSwithAxon.git

then check out to the appropriate commit hash:

git checkout 06411af499a8d9dab62e1697820ca1c696f766dd

You can run the application after checking it out by executing mvn spring-boot:run in the root directory as the application is built with Spring boot.

In the previous post we were able to make use of Axon’s building blocks to set up a repository, from which we were able to retrieve our aggregate root.

We also had the command handling components in place such that we were able to dispatch commands that ended up changing states in our application. But as stated at the end of that post, even though we were able to have commands that ended up leading to state change, we did not touch at the heart of CQRS just yet.

This is because we were still using both the same model and infrastructure for the command handling and querying components. And as already touched upon in Exploring CQRS Architecture with Axon Framework: Introduction, the very core of CQRS is having distinct and separate components that handles writing (commands) and reading (querying).

How then, do we get at the heart of CQRS and have our command component separated from the query component? To answer that, let us take another look at the architectural diagram of CQRS we presented in the introductory post:



In the diagram, we see that the Event Bus lay in between the side of the architecture that is concerned with Commands/Writes and the side that is concerned with reads/query. Thus to answer our question, we make use of the event bus to achieve the separation we seek.

How does this work? It work thus:
  1. On the write side, commands leads to state changes in our domain models/aggregates
  2. This changes in state in the domain leads to domain events that captures what changed
  3. The domain events are published to the event bus.
  4. On the read/query side, event handlers listen to these events and use the information they convey to maintain a reflection of the state of the application. This state is then used for the read side of the application.
The events published from changes in the domain model are technically referred to as domain events and Axon comes with the necessary infrastructure that takes care of publishing these domain events whenever changes occur in the domain model.

The next section of this post looks at the code changes that was made to the sample application in other to wire up and use the necessary Axon’s components to achieve this decoupling of the command side from the query side, basically the highlighted components in the diagram below:

Overview of code changes


Updating Init Script
Since we would be having another model/storage for the query, we updated the startup script to create a Account_View table which would be used for reading the state of the application. We also inserted our two dummy account numbers: acc-one and acc-two in the table. So our init() method now look thus:

@PostConstruct
private void init(){
  // init the tables for commands
  TransactionTemplate transactionTmp = new TransactionTemplate(txManager);
  transactionTmp.execute(new TransactionCallbackWithoutResult() {
      @Override
      protected void doInTransactionWithoutResult(TransactionStatus status) {
          UnitOfWork uow = DefaultUnitOfWork.startAndGet();
          repository.add(new Account("acc-one"));
          repository.add(new Account("acc-two"));
          uow.commit();
      }
  });

  // init the tables for query/view
 JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
 jdbcTemplate
       .execute("create table account_view (account_no VARCHAR , 
                                                      balance FLOAT )");

 jdbcTemplate
       .update("insert into account_view (account_no, balance) values (?, ?)", 
                                              new Object[]{"acc-one", 0.0});

 jdbcTemplate
       .update("insert into account_view (account_no, balance) values (?, ?)", 
                                             new Object[]{"acc-two", 0.0});
}

Add an Event Bus
We need an event bus. It is the infrastructure that allows the possibility of having events routed to event handlers.

You will notice that the event bus may look similar to the command bus in the sense that they are both message dispatching infrastructure. They are actually different in a fundamental way in terms of their functionality.

Yes, the command bus dispatches commands, and the event bus dispatches events, but the command bus works with commands which are used to express what needs to happen in the nearest future and it expects there to be one and only one command handler that will interpret and execute the intention captured in a Command.

The event bus on the other hand, routes events and events are an expression of something that has happened in the past, and there could be zero or more event handlers for an event.

The EventBus is the Axon Interface that describes the event dispatching component. Axon comes with a couple of implementations, and for our sample application, we would be making use of the SimpleEventBus. Which we wired up as a Spring bean thus:

/**
* The simple command bus, an implementation of an EventBus
* mostly appropriate in a single JVM, single threaded use case.
* @return the {@link SimpleEventBus}
*/
@Bean
public SimpleEventBus eventBus() {
  return new SimpleEventBus();
}

We also need to wire up the Axon Infrastructure that makes it easy to set up event handlers that responds to event published to the event bus.

The Axon Framework comes with the @EventHandler annotation which can be used to mark a method as an event handler. The first argument of the methods signifies the type of event the method should respond to.

The AnnotationEventListenerBeanPostProcessor scans for Spring beans that have methods with @EventHandler and automatically register them to the event bus as event handlers.

The AnnotationEventListenerBeanPostProcessor is the event counter part to the AnnotationCommandHandlerBeanPostProcessor.

We register the AnnotationEventListenerBeanPostProcessor thus:

@Bean
AnnotationEventListenerBeanPostProcessor 
                 annotationEventListenerBeanPostProcessor() {
  /**
   * The AnnotationEventListenerBeanPostProcessor 
     finds all beans that has methods annotated with @EventHandler
   * and subscribe them to the eventbus.
   */
  AnnotationEventListenerBeanPostProcessor listener = 
                               new AnnotationEventListenerBeanPostProcessor();
  listener.setEventBus(eventBus());
  return listener;
}

[UPDATE]
Since version 2.3, The Axon Framework provides the @AnnotationDriven annotation which prevents having to explicitly declare a bean of type AnnotationEventListenerBeanPostProcessor. To use it, just annotate the Spring @Configuration class with @AnnotationDriven and all the @CommandHandler's and @EventHandler's will be automatically scanned and registered with their respective Bus. The accompanying sample application has been updated (with the d6c9f18750f8f7d4c341c80a07bdf44c5a815783 commit) to use the @AnnotationDriven.


Last update we need to make to our configuration is to provide the GenericJpaReposirtory with the event bus.

Update the Repository with Event Bus
The GenericJpaRepository needs the event bus since it will be publishing domain events when changes occur in our domain objects. We provide it with the event bus in the configuration below:

@Bean
public GenericJpaRepository genericJpaRepository() {
  SimpleEntityManagerProvider entityManagerProvider = 
                      new SimpleEntityManagerProvider(entityManager);

  GenericJpaRepository genericJpaRepository = 
                      new GenericJpaRepository(entityManagerProvider, 
                                                         Account.class);

  /**
   * Configuring the repository with an event bus which allows the repository
   * to be able to publish domain events
   */
  genericJpaRepository.setEventBus(eventBus());
  return genericJpaRepository;
}

Changes to the Domain Model
We then move to the Account.class which is the Aggregate Root and only Aggregate in our setup. We added code that allows the publishing of Domain Events whenever we have state change.

Our Debit method now looks thus:

public void debit(Double debitAmount) {

if (Double.compare(debitAmount, 0.0d) > 0 &&
  this.balance - debitAmount > -1) {
  this.balance -= debitAmount;

 /**
  * A change in state of the Account has occurred which can 
  * be represented by an to an event: i.e.
  * the account has been debited so an AccountDebitedEvent 
  * is created and registered.
  *
  * When the repository stores this change in state, it will 
  * also publish the AccountDebitedEvent
  * to the outside world.
  */
  AccountDebitedEvent accountDebitedEvent = 
              new AccountDebitedEvent(this.id, debitAmount, this.balance);
  registerEvent(accountDebitedEvent);

} else {
  throw new IllegalArgumentException("Cannot debit with the amount");
}

}

and the credit method:

public void credit(Double creditAmount) {

  if (Double.compare(creditAmount, 0.0d) > 0 &&
       Double.compare(creditAmount, 1000000) < 0) {

       this.balance += creditAmount;

  /**
  * A change in state of the Account has occurred which 
  * can be represented by an to an event: i.e.
  * the account has been credited so an AccountCreditedEvent 
  * is created and registered.
  *
  * When the repository stores this change in state, it will 
  * also publish the AccountCreditedEvent
  * to the outside world.
  */
  AccountCreditedEvent accountCreditedEvent = 
              new AccountCreditedEvent(this.id, creditAmount, this.balance);
  registerEvent(accountCreditedEvent);
 } else {
  throw new IllegalArgumentException("Cannot credit with the amount");
  }
}

In which we create either an AccountDebitedEvent or AccountCreditedEvent and use the registerEvent() to register the event created.

We have the registerEvent() method available because our Account class extends the AbstractAggregateRoot.

The registerEvent() exposes Axon mechanism of keeping track of domain events that would need to be published to the event bus when the domain object is saved to the repository.

We mentioned AccountDebitEvent and AccountCreditedEvent as the events that signifies that the accounts has been debited/credit. They are the domain events. The class representing these events look thus:

public class AccountCreditedEvent {

  private final String accountNo;
  private final Double amountCredited;
  private final Double balance;

  public AccountCreditedEvent(String accountNo, 
                 Double amountCredited, Double balance) {
      this.accountNo = accountNo;
      this.amountCredited = amountCredited;
      this.balance = balance;
  }

  public String getAccountNo() {
      return accountNo;
  }

  public Double getAmountCredited() {
      return amountCredited;
  }

  public Double getBalance() {
      return balance;
  }
}

and

public class AccountDebitedEvent {
  private final String accountNo;
  private final Double amountDebited;
  private final Double balance;

  public AccountDebitedEvent(String accountNo, 
                 Double amountDebited, Double balance) {
      this.accountNo = accountNo;
      this.amountDebited = amountDebited;
      this.balance = balance;
  }

  public String getAccountNo() {
      return accountNo;
  }

  public Double getAmountDebited() {
      return amountDebited;
  }

  public Double getBalance() {
      return balance;
  }
}

What have we achieved thus far? Let us go over it before we move on.

We now have the domain events (AccountCreditedEvent and AccountDebitedEvent), we have updated our domain object to publish these domain events and we have updated our configuration with the necessary infrastructure that allows publishing of the domain events. The next thing we need to add are the event handlers.

Adding Event Handlers

We added two event handlers: AccountDebitedEventHandler

@Component
public class AccountDebitedEventHandler {

@Autowired
DataSource dataSource;

@EventHandler
public void handle AccountDebitedEvent(AccountDebitedEvent event) {
      
 JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);

 // Get the current states as reflected in the event
 String accountNo = event.getAccountNo();
 Double balance = event.getBalance();

 // Update the view
 String updateQuery = "UPDATE account_view SET balance = ? 
                                             WHERE account_no = ?";
 jdbcTemplate.update(updateQuery, new Object[]{balance, accountNo});
 }
}

and AccountCreditedEventHandler

@Component
public class AccountCreditedEventHandler {

@Autowired
DataSource dataSource;

@EventHandler
public void handleAccountCreditedEvent(AccountCreditedEvent event, 
Message eventMessage, @Timestamp DateTime moment) {

JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);

// Get the current states as reflected in the event
String accountNo = event.getAccountNo();
Double balance = event.getBalance();

// Update the view
String updateQuery = "UPDATE account_view SET balance = ? 
                                           WHERE account_no = ?";
jdbcTemplate.update(updateQuery, new Object[]{balance, accountNo});

System.out.println("Events Handled With EventMessage " + 
            eventMessage.toString() + " at " + moment.toString());
}
}

As can be seen, the event handling method is annotated with @EventHandler. The type of event they respond to is indicated by the first argument of the annotated method.

Since we have had the AnnotationEventListenerBeanPostProcessor registered in the configuration, these classes would have been subscribed as event handlers to the event bus.

So what goes on within these event handling methods? We are extracting the information from the respective events and using JDBC to update the Account_view table which is the table used for querying/reading operations.

One important thing to note at this junction is the fact that we are storing the view data in a different table from the one used for the command operations.

We also are not using any ORM mapping (JPA etc) for the view layer, neither do we have any special class that models the Account when projecting their state. This peculiarity is very core to the CQRS way of seeing things. The fact that, with CQRS our query layer could implemented using a different, and simplified abstraction that can easily be optimized for query/reading operations.

The handleAccountCreditedEvent() method showcases some of the additional features Axon provides for event handling. As can be seen, we have two additional parameters. Message eventMessage which contains the event message: metadata, id etc. and @Timestamp DateTime moment which is injected with the moment in time the event was published.

The next thing to do is to update our view (ViewController/Javascript) to make use of this new setup.

Updating the View
We update the getAccounts method of the ViewController to use plain JDBC to query for the state of the accounts:

@Controller
public class ViewController {

 @Autowired
 private DataSource dataSource;

 @RequestMapping(value = "/view", 
                method = RequestMethod.GET, 
                produces = MediaType.APPLICATION_JSON_VALUE)
 @ResponseBody
 public List<Map<String, Double>> getAccounts() {

 JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
   List<Map<String, Double>> queryResult = 
   jdbcTemplate.query("SELECT * from account_view ORDER BY account_no", 
                                                  (rs, rowNum) -> {
   return new HashMap<String, Double>() {{
                           put(rs.getString("ACCOUNT_NO"),
                           rs.getDouble("BALANCE"));
   }};
});

  return queryResult;
 }
}

The JavaScript that polls the /view endpoint is still in place.

And with all these changes made, when you run the application you would still be able to select accounts to either credit or debit and have the balance reflected in the balance section. Just that this time around, the debiting/credit is done via components that are different from the components that is used to view the current balance of the accounts.

Overview of the Axon Building Blocks

In this post, we touched on some new building blocks in Axon Framework. Let us quickly go over them:

SimpleEventBus
Axon infrastructure that takes care of event routing to event handlers.

AnnotationEventListenerBeanPostProcessor
Comes in handly when using Axon in a Spring Application. It scans for Spring beans that have @EventHandler annotation and automatically register them as event handlers to the event bus.

RegisterEvent method
Method that can be used within a domain object that extends AbstractAggregateRoot to register domain events for publication.

Summary.

So far so good we have been able to set up components of an application in the spirit of CQRS.

You will notice we have done this without mentioning or using Event Sourcing, showing that you can build a CQRS application without using Event Sourcing if you do not have a need for it.

But what if you want to use Event Sourcing? How does the Axon Framework help? The next post Exploring CQRS with Axon Framework: Applying Event Sourcing answers that question and looks at how to use Event Sourcing within a CQRS application with Axon Framework.

No comments: