Event Sourced Aggregates

As of Domain Driven Design approach, an aggregate is a domain specific pattern for encapsulating a collection of entities that live on their own. The aggregate encapsulates all the behavior of internal parameters and is unique, being identifiable by an id. Changing the state of the aggregate is possible only through it’s exposed domain specific methods. Because an aggregate needs to be persisted for a longer time, we should always save and load the entire object as a whole. By applying different commands to an aggregate, it always results in a new aggregate state that will overwrite any previous states. This means that we loose all history of the aggregate. Logging a message for each behavior taken would help with having a history of the aggregate but what if we want to see what was the state of the aggregate at a particular time in the past. Or if we want to implement new features and use all the past data to update the aggregate.

A better approach would be to generate events with every behavior taken on the aggregate and apply the events to the aggregate to change it’s state. When we persist the aggregate, it is enough to save the events because we can reapply them to the aggregate again. If we want to see the state of the aggregate at a particular time in the past we apply only the events that happened up to that point in time. We have much more flexibility by relying on an event log to generate the aggregate.

This would also map better to real life actions, the events being just facts that happened in the past. And as the past can not be changed, the events should not be ever changed too.

Starting from a Rover domain model

Let’s start from a simple Rover model that can move and turn on a 2d matrix. The Rover has an id, a location defined by longitude(N-S) and latitude (E-W) and an orientation (N,S,E,W). As behaviors we will have moveForward, turnLeft and turnRight. A rover is created by placing it on the matrix.

Here is the initial implementation of the Rover. Notice the 3 behaviors are mutating internal state of the object.

@Data
public class Rover {
    private final String id;
    private Location location;
    private Orientation orientation;

    public Rover(String id, Location location, Orientation orientation) {
        this.id = id;
        this.location = location;
        this.orientation = orientation;
    }

    public void moveForward() {
        this.location = this.location.forward(orientation);
    }

    public void turnLeft() {
        this.orientation = this.orientation.left();
    }

    public void turnRight() {
        this.orientation = this.orientation.right();
    }

    @Value
    @AllArgsConstructor
    public static class Location {
        private final double latitude; //N-S
        private final double longitude; //E-W

        private Location forward(Orientation orientation) {
            switch (orientation) {
                case N:
                    return new Location(this.getLatitude()+1, this.getLongitude());
                case E:
                    return new Location(this.getLatitude()+0, this.getLongitude()+1);
                case S:
                    return new Location(this.getLatitude()-1, this.getLongitude()+0);
                case W:
                    return new Location(this.getLatitude()+0, this.getLongitude()-1);
            }
            throw new IllegalStateException("Unprocessable orientation "+orientation);
        }
    }

    @Value
    public static enum Orientation {
        N("W","E"),
        S("E","W"),
        E("N","S"),
        W("S","N");

        private String left;
        private String right;

        Orientation(String left, String right) {
            this.left = left;
            this.right = right;
        }

        public Orientation left() {
            return Orientation.valueOf(left);
        }

        public Orientation right() {
            return Orientation.valueOf(right);
        }
    }
}

The Location#forward method could have been simplified by moving the latitude, longitude additions (-1, +1) per orientation inside the Orientation class but I preferred to let the Location class dictate and have the Orientation class only for it’s own purpose.

Testing the Rover model

Testing this is simply simulating all possible behaviors. Here is an example of a test that checks multiple behaviors. We will see later a more convenient functional way of simulating this.

    @Test
    public void moveAndTurnRoverMultipleTimes1() {
        RoverAggregate rover = new RoverAggregate("id-01", new RoverAggregate.Location(1,2), RoverAggregate.Orientation.W);

        rover.moveForward();  // 0,2 W
        rover.turnLeft(); // 0,2 S
        rover.moveForward(); // 0,1 S
        rover.turnRight(); // 0,1 W

        assertThat(rover).isEqualTo(new RoverAggregate("id-01", new RoverAggregate.Location(0,1), RoverAggregate.Orientation.W));
    }

The full unit tests for this initial Rover class can be found here: RoverAggregateTest.java

Adding a repository for this Rover model

The repository is straight forward. It will save and load the Rover by it’s id. For simplicity we are using an in memory Map as storage.

public class RoverRepository {

    private static final Map<String, Rover> storage = new HashMap<String, Rover>();

    public void save(Rover rover) {
        storage.put(rover.getId(), rover);
    }

    public Rover findById(String id) {
        return storage.get(id);
    }
}

Notice that on every change, the save method is overwriting the previous state of the Rover.

The unit tests for the RoverRepository can be found here: RoverRepositoryTest.java

Transforming the model into an event sourced aggregate

In this part we will introduce internal events for changing the state of the Rover. For each behavior of the Rover we will emit an internal event that will be applied to the same object and apply changes. And instead of persisting the Rover, we will save only the events that generated the current state of the Rover.

@Data
public class RoverAggregate {
    private String id;
    private Location location;
    private Orientation orientation;

    public RoverAggregate(String id, Location location, Orientation orientation) {
        this.apply(new RoverCreatedEvent(id,location,orientation));
    }

    public void moveForward() {
        this.apply(new MovedForwardEvent());
    }

    public void turnLeft() {
        this.apply(new TurnedLeftEvent());
    }

    public void turnRight() {
        this.apply(new TurnedRightEvent());
    }


    public void apply(RoverCreatedEvent roverCreatedEvent) {
        this.id = roverCreatedEvent.getId();
        this.location = roverCreatedEvent.getLocation();
        this.orientation = roverCreatedEvent.getOrientation();
    }

    public void apply(MovedForwardEvent movedForwardEvent) {
        this.location = this.location.forward(orientation);
    }

    public void apply(TurnedLeftEvent turnedLeftEvent) {
        this.orientation = this.orientation.left();
    }

    public void apply(TurnedRightEvent turnedRightEvent) {
        this.orientation = this.orientation.right();
    }
}

Now all we have to do is hold these events in the aggregate so when saving it we will only persist the events. We will add a temporary events list to the aggregate to store all new events until persisted: private List<DomainEvent> newEvents = new ArrayList<>();.

Applying an event to the aggregate will modify the aggregate’s state and temporary hold the event. We will also return the aggregate for each apply method to be able to chain multiple calls. Another option would be to generate new Aggregates for every event instead of mutating the current one. For example the RoverCreatedEvent apply method would look like this:

public RoverAggregate apply(RoverCreatedEvent roverCreatedEvent) {
    this.id = roverCreatedEvent.getId();
    this.location = roverCreatedEvent.getLocation();
    this.orientation = roverCreatedEvent.getOrientation();

    newEvents.add(roverCreatedEvent);
    return this;
}

The events are simple serializable data objects that define a type and also a timestamp. We can use the timestamp for recreating an aggregate at a particular point in time.

Here is example of the RoverCreatedEvent:

@Getter
@EqualsAndHashCode
@ToString
public abstract class DomainEvent {
    private String type = this.getClass().getName();
    private Instant createdAt = Instant.now();
}

@Value
@RequiredArgsConstructor
public class RoverCreatedEvent extends DomainEvent implements Serializable {
    private final String id;
    private final RoverAggregate.Location location;
    private final RoverAggregate.Orientation orientation;
}
...

The implementations for the other event objects can be found here: DomainEvents

We will also need a method where we apply more general DomainEvents type and do the pattern matching inside the aggregate. This way we can store events as DomainEvents and hide the conversion inside the aggregate.

Here is the final implementation of the RoverAggregate:

@Data
@EqualsAndHashCode(exclude = "newEvents")
public class RoverAggregate {
    private String id;
    private Location location;
    private Orientation orientation;

    // represent events that were applied since last persistence of the aggregate
    private List<DomainEvent> newEvents = new ArrayList<>();

    public RoverAggregate() {
    }

    public RoverAggregate(String id, Location location, Orientation orientation) {
        this.apply(new RoverCreatedEvent(id,location,orientation));
    }

    public void moveForward() {
        this.apply(new MovedForwardEvent());
    }

    public void turnLeft() {
        this.apply(new TurnedLeftEvent());
    }

    public void turnRight() {
        this.apply(new TurnedRightEvent());
    }


    public RoverAggregate apply(RoverCreatedEvent roverCreatedEvent) {
        this.id = roverCreatedEvent.getId();
        this.location = roverCreatedEvent.getLocation();
        this.orientation = roverCreatedEvent.getOrientation();

        newEvents.add(roverCreatedEvent);
        return this;
    }

    public RoverAggregate apply(MovedForwardEvent movedForwardEvent) {
        this.location = this.location.forward(orientation);

        newEvents.add(movedForwardEvent);
        return this;
    }

    public RoverAggregate apply(TurnedLeftEvent turnedLeftEvent) {
        this.orientation = this.orientation.left();

        newEvents.add(turnedLeftEvent);
        return this;
    }

    public RoverAggregate apply(TurnedRightEvent turnedRightEvent) {
        this.orientation = this.orientation.right();

        newEvents.add(turnedRightEvent);
        return this;
    }

    public RoverAggregate apply(DomainEvent domainEvent) {
        switch (domainEvent.getType()) {
            case "demo.stage2eventsourced.RoverCreatedEvent":
                return this.apply((RoverCreatedEvent) domainEvent);
            case "demo.stage2eventsourced.MovedForwardEvent":
                return this.apply((MovedForwardEvent) domainEvent);
            case "demo.stage2eventsourced.TurnedLeftEvent":
                return this.apply((TurnedLeftEvent) domainEvent);
            case "demo.stage2eventsourced.TurnedRightEvent":
                return this.apply((TurnedRightEvent) domainEvent);
        }
        throw new IllegalStateException("Unprocessable DomainEvent "+domainEvent);
    }

    // used for removing temporary events after persistence 
    public void clearNewEvents() {
        newEvents.clear();
    }
}

All previous tests cases should be still valid. We did not change any of the publicly exposed behaviors.

Functional approach for applying the events

Now we have the pattern matching apply(DomainEvent domainEvent) method that returns the modified aggregate. We can use this to apply a stream of events to the aggregate using a reducer resulting in a RoverAggregate that will have the last state. The reducer will take each event, one by one in order and apply it to a new RoverAggregate. This functional approach is also called as leftFold.

Stream<DomainEvent> eventStream = Stream.of(
        new RoverCreatedEvent("id-01", new RoverAggregate.Location(1,2), RoverAggregate.Orientation.W),
        new MovedForwardEvent(),
        new TurnedLeftEvent(),
        new MovedForwardEvent(),
        new TurnedRightEvent());

RoverAggregate roverAggregate = eventStream.reduce(new RoverAggregate(), RoverAggregate::apply, (a, b)-> null);

Since we have only reduce in Java, we have to live with this side effect (a, b)-> null that should be ignored.

Updating the repository to store only events and rebuild the Aggregate on load

For persisting the state of the aggregate we will just have to store the DomainEvents and with every new save append the new events to the already existing ones. This way we heep all the history and not loose any past data.

To load the aggregate we have to take all the events for that id and apply them to a new RoverAggregate. If we would like to create a RoverAggregate at a particular point in the past we can load only the events up to that time.

To keep it simple we will use just an in memory map here as storage (private static final Map<String, List<DomainEvent>> storage = new HashMap<>();) but ideally these events should be stored in an append only data store.

public class RoverRepository {

    private static final Map<String, List<DomainEvent>> storage = new HashMap<>();

    public void save(RoverAggregate rover) {
        List<DomainEvent> domainEvents = storage.getOrDefault(rover.getId(), new ArrayList<>());
        domainEvents.addAll(rover.getNewEvents());
        rover.clearNewEvents();

        storage.put(rover.getId(), domainEvents);
    }

    public RoverAggregate findById(String id) {
        return storage.get(id)
                .stream()
                .reduce(new RoverAggregate(), RoverAggregate::apply, (a, b) -> null);
    }
}

Conclusion

Storing the events instead of the final state (the projection of the events) is a much better way of keeping all the history of the behaviors. Any newly added feature can also rely on all past actions by replying all the events again.

For example if we want to implement a new function to get the total distance a rover moved since it’s beginning, we just have to implement a new property in the aggregate that will increment every time the MovedForwardEvent was applied. When reloading the aggregate from db it will update the distance value without need of any other changes.

It is indeed possible that the number of events applied to an aggregate grows to a very large number and loading it by applying each event would take a long time. Even if the old events are not relevant to us anymore, these are still needed to be able to reply everything from the beginning and keep current state consistent. An alternative solution would be to save a snapshot of the aggregate (let’s say once a day) and on each new load from DB - start from the snapshotted aggregate and only apply the newer events.

The events in the store should not ever be changed. If a wrong behavior was executed (wrong event was generated) we will not remove the created event but instead we will create a new event that fixes the issue. This is very good for auditing since we guarantee 100% history of our past records. Even using physical drives that only allows writing data and not deleting/mutating would be a good solution.

To go more in depth in this topic check out the Axon Framework witch will offer support in building an event sourced system together with a CQRS approach and it is useful for learning the concept too.


The complete implementation of the aggregate can be found here: event-sourced-aggregate

Thanks for reading.

Written on December 6, 2018
← back