Writing a Zeebe Exporter - Part Two

by Josh Wulf on Jun 1 2019 in Zeebe Resources.

In Part One, we built a minimal exporter, and learned about the exporter life-cycle methods. In this blog post, we will walk through building a Zeebe exporter for the Event Store database, an open source database for storing event streams.

The complete source code for this example is available on GitHub. As well, there is a compiled version, along with a docker-compose configuration for it on the 0.18 branch of the zeebe-docker-compose repo.

This exporter is based on the Simple Monitor exporter. The Simple Monitor exporter exports to an H2 database via JDBC. The exporter we are going to write uses REST to export to Event Store. We’ll use the same patterns for configuration, but we have less configuration, and we don’t have to worry about SQL. With less details it will be easier to see the structure.

We’ll cover the following aspects of a writing a production-grade exporter:

Create a new Maven Project

mvn archetype:generate -DgroupId=io.zeebe \
    -DartifactId=zeebe-eventstore-exporter \
    -DarchetypeArtifactId=maven-archetype-quickstart \
    -DinteractiveMode=false
<dependency>
    <groupId>io.zeebe</groupId>
    <artifactId>zeebe-exporter-api</artifactId>
    <version>0.18.0-SNAPSHOT</version>
</dependency>

Note: the current development version at the time of writing is Zeebe 0.18.0-SNAPSHOT. Check the releases page for the latest version.

import io.zeebe.exporter.api.spi.Exporter;

Note: as of June 2, 2019, there is an open issue to move this interface to io.zeebe.exporter.api.Exporter.

Create the EventStoreExporter class

View the source code on GitHub.

public class EventStoreExporter implements Exporter {

}
public class EventStoreExporter implements Exporter {

    public void configure(Context context) {

    }

    public void open(Controller controller) {

    }

    public void close() {

    }

    public void export(Record record) {

    }
}

Configuration

The configure method is called when your exporter is first loaded by the broker. This is where you can get your exporter’s configuration from its section in the zeebe.cfg.toml file. If your exporter does not find sufficient configuration to operate or cannot connect to a dependent external system, it should throw a RuntimeException at this point. This will stop the broker from starting.

Later, we will implement a check in this method to ensure that we can reach the Event Store database.

The configure method is passed an instance of an Exporter Context. It exposes three methods: getLogger(), which gives you a logger to output to the broker log, and getConfiguration(), to get your exporter’s configuration. In 0.18, it also exposes a setFilter() method that allows you to specify which records your exporter wants to see.

public class EventStoreExporter implements Exporter {

    private Logger log;
    private EventStoreExporterConfiguration configuration;

    public void configure(Context context) {
        log = context.getLogger();
        configuration = context
            .getConfiguration()
            .instantiate(EventStoreExporterConfiguration.class);
    }
}

The call to context.getConfiguration.instantiate(Class<T>) creates an object of type T and hydrates it from the zeebe.cfg.toml file.

package io.zeebe;

public class EventStoreExporterConfiguration {
    /**
     * The URL of the Event Store database REST endpoint
     */
    String url = "http://localhost:2113";

    /**
     * The name of the stream in Event Store. This will automatically be created
     * in Event Store when we first post data to it.
     */
    String streamName = "zeebe";

    /**
     * To configure the amount of records, which has to be reached before the
     * records are exported to the database. Only counts the records which are
     * in the end actually exported.
     */
    int batchSize = 100;

    /**
     * To configure the time in milliseconds, when the batch should be executed
     * regardless whether the batch size was reached or not.
     */
    int batchTimeMilli = 300;
}

We’ll address the batchSize and batchMilli values in a moment.

We’re going to use the same pattern used in the Simple Monitor Exporter to allow environment variables to override the configuration from the configuration file.

public class EventStoreExporter implements Exporter
{
    private static final String ENV_PREFIX = "EVENT_STORE_EXPORTER_";
    private static final String ENV_URL = ENV_PREFIX + "URL";
    private static final String ENV_STREAM_NAME = ENV_PREFIX + "STREAM_NAME";
    private static final String ENV_BATCH_SIZE = ENV_PREFIX + "BATCH_SIZE";
    private static final String ENV_BATCH_TIME_MILLI = ENV_PREFIX +
        "BATCH_TIME_MILLI";

Now, we will enrich the configuration from the environment, allowing us to override the defaults and the configuration file with environment variables:

private void applyEnvironmentVariables(final EventStoreExporterConfiguration cfg) {
    final Map<String, String> environment = System.getenv();

    Optional.ofNullable(environment.get(ENV_STREAM_NAME))
            .ifPresent(streamName ->
                cfg.streamName = streamName);
    Optional.ofNullable(environment.get(ENV_URL))
            .ifPresent(url ->
                cfg.url = url);
    Optional.ofNullable(environment.get(ENV_BATCH_SIZE))
            .ifPresent(batchSize ->
                cfg.batchSize = Integer.parseInt(batchSize));
    Optional.ofNullable(environment.get(ENV_BATCH_TIME_MILLI))
            .ifPresent(batchTimeMilli ->
                cfg.batchTimeMilli = Integer.parseInt(batchTimeMilli));
}

Then call this method, passing in the configuration, in the configure method:

public void configure(final Context context) {
    log = context.getLogger();
    configuration = context
            .getConfiguration()
            .instantiate(EventStoreExporterConfiguration.class);
    applyEnvironmentVariables(configuration);
    // @TODO test connection to Event Store and throw if not available
    log.debug("Exporter configured with {}", configuration);
}

Queueing and Batching Exported records

Zeebe is fast. It can handle extremely high throughput, and uses an append-only log on disk. It is easy to overwhelm an external system when exporting. In order to reduce the overhead on external systems when exporting, you can batch events. The Simple Monitor exporter batches up events, one hundred at a time, and exports them every second. In a production system, this can be insufficient. In load-testing Zeebe, I’ve watched the Simple Monitor exporting for more than twenty minutes after the load test has finished, at 100 events/second, and still not catch up to the event log. You will want to tune both the period and batch size to match both the throughput of your broker and the capacity of your external system to catch the events.

We are going to batch events and send the batches to the Event Store database on a timer, rather than at the moment the broker invokes the exporter’s export() method. In that method, we will be queueing records.

So we have three concerns:

  1. Buffering records at the moment that they are seen by the exporter.
  2. Batching records on a schedule.
  3. Periodically sending batches to the external system (with back-off retry).

Event Queue

View the source code on GitHub.

We’ll implement an event queue to buffer events as they are seen by the exporter.

class EventQueue {
}

We want a FIFO queue, so we’ll use a LinkedList to store the events. We will store two pieces of information for each event - the record position, and the event. We need the record position to tell the broker where the exporter is up to in the event stream (it needs to know this so it can truncate the disk log after successful export). And we’ll store a JSON representation of the event, because the Event Store database wants JSON input.

Since we are storing two pieces of information that will never be updated, we’ll use an org.apache.commons.lang3.tuple.ImmutablePair:

class EventQueue {
    final LinkedList<ImmutablePair<Long, JSONObject>>
        queue = new LinkedList<>();
}
void addEvent(Record record) {
    final JSONObject json = new JSONObject();
    json.put("eventId", createIdempotentEventId(record));
    json.put("data", new JSONObject(record.toJson()));
    json.put("eventType", "ZeebeEvent");
    queue.add(new ImmutablePair<>(record.getPosition(), json));
}

We need to give each event an idempotent id. Events are guaranteed to be at-least-once for exporters, so an exporter can see the same event twice. Event Store can guarantee idempotency if an event is posted with a unique UUID as the value for eventId.

Simple Monitor uses a combination of the record position and the partition id to uniquely identify an event, but Event Store needs a v4 UUID. We can’t randomly assign them - we need to deterministically derive them from the metadata of the event, so that we can detect duplicates over time. So, we will write a custom UUID implementation that is guaranteed to produce the same UUID for the same event metadata, and is highly likely to be unique.

private String createIdempotentEventId(Record record) {
    String seed = "393d7039721342d6b619de6bff4ffd2e";
    String id = String.valueOf(record.getPosition()) +
        record.getMetadata().getPartitionId();
    StringBuilder sb = new StringBuilder(seed);
    sb.delete(31 - id.length(), 31);
    sb.append(id);
    sb.insert(8, "-");
    sb.insert(13, "-");
    sb.insert(18, "-");
    sb.insert(23, "-");
    return sb.toString();
}

So our EventQueue class now exposes a queue and method to add events to the queue.

Adding events to the Event Queue

Adding events to the event queue from the exporter is straight-forward.

public void open(final Controller controller) {
    eventQueue = new EventQueue();
}
public void export(Record record) {
    eventQueue.addEvent(record);
}

That is literally all there is to that. Every time the exporter sees a record, we just add it to the Event Queue.

Batching

View the source code on GitHub.

Now we’ll create the batching implementation.

class Batcher {
}

This class will batch the events from the event queue into the specified batch size, and then queue them to be sent to the Event Store database over REST.

To do this, it will need a queue. This queue will hold batches of events, so it will be a JSONArray. We also need to store the record position of the latest event in the batch, to tell the broker what has been successfully exported. We’ll use another LinkedList of ImmutablePair:

class Batcher {
    private final LinkedList<ImmutablePair<Long, JSONArray>>
        queue = new LinkedList<>();
}

Our Batcher will need to know the batch size, and batching period:

class Batcher {
private final int batchSize;
final LinkedList<ImmutablePair<Long, JSONArray>> queue = new LinkedList<>();
final int batchPeriod;

Batcher(EventStoreExporterConfiguration configuration) {
    batchSize = configuration.batchSize;
    batchPeriod = configuration.batchTimeMilli;
}

Now, we need to implement the batching method.

At this point, we might also give some thought to where orchestration of the process takes place. We could put a timer in the Batcher and have it batch the event queue. But it’s probably best to put all the orchestration in the main class, to give us a single place to look to see what is going on. That’s fitting, because that’s Zeebe’s architectural approach - a single point of orchestration of various services.

void batchFrom(EventQueue eventQueue) {
}
void batchFrom(EventQueue eventQueue) {
    if (eventQueue.queue.isEmpty()) {
        return;
    }
}
void batchFrom(EventQueue eventQueue) {
    if (eventQueue.queue.isEmpty()) {
        return;
    }
    Long positionOfLastRecordInBatch = -1L;
    final int countEventsToBatchNow =
        Math.max(batchSize, eventQueue.queue.size());
    final JSONArray jsonArrayOfEvents = new JSONArray();

    for (int i = 0; i < countEventsToBatchNow; i ++) {
        final ImmutablePair<Long, JSONObject> event =
            eventQueue.queue.pollFirst();
        if (event != null) {
            jsonArrayOfEvents.put(event.getValue());
            positionOfLastRecordInBatch = event.getKey();
        }
    }
    queue.add(new ImmutablePair<>(positionOfLastRecordInBatch,
                                                jsonArrayOfEvents));
}

Sending batches over REST

Our sending implementation really needs two parts: a stateless HTTP sender, and an stateful object to manage back-off retry in the event of failure. We will use the stateless part during the configure lifecycle of the exporter - to test the connection to the database - so we want to decouple it from the stateful component.

class HttpStatelessSender {
    private static final String MIME_TYPE =
        "application/vnd.eventstore.events+json";
    private String url;

    HttpStatelessSender(EventStoreExporterConfiguration configuration) {
        url = configuration.url + "/streams/" + configuration.streamName;
    }

    void send(JSONArray eventBatch) throws IOException {
        try (final CloseableHttpClient client = HttpClients.createDefault()) {
            final HttpPost httpPost = new HttpPost(url);
            final StringEntity json = new StringEntity(eventBatch.toString());
            httpPost.setEntity(json);
            httpPost.setHeader("Content-Type", MIME_TYPE);
            client.execute(httpPost);
        }
    }
}

Check the database connection in the configure stage

Now that we have a class HttpStatelessSender to communicate with the database, let’s go back to the EventStoreExporter class and use it to test the database connection in the configure method.

When I wrote this exporter, the Exporter interface definition did not declare that configure throws, so we have to catch the IOException thrown by the stateless sender, and then throw a RuntimeException to get the code to compile. (There is an open issue about this on GitHub, so this may have changed by the time you read this).

public class EventStoreExporter implements Exporter {

    private Logger log;
    private EventStoreExporterConfiguration configuration;

    public void configure(Context context) {
        log = context.getLogger();
        configuration = context
            .getConfiguration()
            .instantiate(EventStoreExporterConfiguration.class);

        log.debug("Exporter configured with {}", configuration);
        testConnectionToDatabase(configuration);
    }

    private void testConnectionToDatabase(
        EventStoreExporterConfiguration configuration) {
        try {
            HttpStatelessSender connectionTest =
                new HttpStatelessSender(configuration);
            connectionTest.send(new JSONArray());
        } catch (Exception e) {
            throw new RuntimeException(e); // halt the broker if unavailable
        }
    }
}

Managing Backoff/Retry statefully

class Sender {
    private final int[] FIBONACCI = new int[]{ 1, 1, 2, 3, 5, 8, 13, 21, 34,
                                                                55, 89, 144};

    private Controller controller;
    private Logger log;
    private HttpStatelessSender http;
    private int sendTimeMilli;
    private int backOffFactor = 1;
    int sendPeriod;

    Sender(Controller controller, Logger log,
            EventStoreExporterConfiguration configuration) {
        sendTimeMilli = configuration.batchTimeMilli;
        this.controller = controller;
        this.log = log;
        http = new HttpStatelessSender(configuration);
    }
}

We’ll use the same pattern as the Batcher: the Sender will be called on a timer, and passed a Batcher. It will then pull the first batch off the Batcher’s queue, and send that to the Event Store database. We’ve made the sendPeriod field readable for this reason. The Sender can inform the controlling code about its scheduling, but all the logic managing that will be encapsulated.

void sendFrom(Batcher batcher) {
    if (batcher.queue.isEmpty()) {
        return;
    }
    final ImmutablePair<Long, JSONArray> batch = batcher.queue.getFirst();
    final JSONArray eventBatch = batch.getValue();
    final Long positionOfLastEventInBatch = batch.getKey();
    try {
        http.send(eventBatch);
        batcher.queue.pollFirst();
        controller.updateLastExportedRecordPosition(positionOfLastEventInBatch);
        backOffFactor = 1;
        sendPeriod = sendTimeMilli;
    } catch (IOException e) {
        backOff();
    }
}
private void backOff() {
    backOffFactor ++;
    sendPeriod = FIBONACCI[Math.min(backOffFactor, FIBONACCI.length -  1)] *
                        sendTimeMilli;
    log.debug("Post to Event Store failed.");
    log.debug("Retrying in " + sendPeriod + "ms...");
}

Schedule Batching and Sending

In the open() method of the EventStoreExporter, we grab a reference to the Controller, and use its scheduleTask() method to schedule the first execution of the batchEvents() and sendBatch() methods (which we’ll implement in a moment):

public void open(final Controller controller) {
    eventQueue = new EventQueue();
    batcher = new Batcher(configuration);
    sender = new Sender(controller, log, configuration);
    this.controller =  controller;
    controller.scheduleTask(Duration.ofMillis(batcher.batchPeriod),
        this::batchEvents);
    controller.scheduleTask(Duration.ofMillis(sender.sendPeriod),
        this::sendBatch);
    log.debug("Event Store exporter started.");
}

Now we’ll implement the two scheduled methods, which will reschedule themselves:

private void batchEvents() {
    batcher.batchFrom(eventQueue);
    controller.scheduleTask(Duration.ofMillis(batcher.batchPeriod),
        this::batchEvents);
}

private void sendBatch() {
    sender.sendFrom(batcher);
    controller.scheduleTask(Duration.ofMillis(sender.sendPeriod),
        this::sendBatch);
}

We get the time period from each of the queues. Scheduling them is the responsibility of the main class, but when they get scheduled is their concern.

Filtering records

The final piece: filtering the records that the exporter sees.

One user in the forum was debugging using the built-in debug exporter (which logs records to the console), and noticed hundreds of job activation records. These are generated whenever a worker polls for work.

We could write logic to filter them in the export() method of the exporter. However, the 0.18 release of Zeebe provides a RecordFilter that can be applied by the broker itself. We’ll use that.

public class RecordFilter implements Context.RecordFilter {

    @Override
    public boolean acceptType(RecordType recordType) {
        return true;
    }

    @Override
    public boolean acceptValue(ValueType valueType) {
        return !valueType.equals(ValueType.JOB_BATCH);
    }
}

This filter drops all JOB_BATCH records, so our exporter will not see any of these. You can use this functionality to do things like write an exporter to see only incidents.

Now we apply this filter in our EventStoreExporter.

private void configureRecordFilter(Context context) {
    RecordFilter filter = new RecordFilter();
    context.setFilter(filter);
}

And then call it in the configure method:

public void configure(final Context context) {
    log = context.getLogger();
    configuration = context
            .getConfiguration()
            .instantiate(EventStoreExporterConfiguration.class);
    applyEnvironmentVariables(configuration);

    configureRecordFilter(context);

    log.debug("Exporter configured with {}", configuration);
    testConnectionToDatabase(configuration);
}

And there you have it: a production-grade exporter.

Important things to know about Exporters

Two things to note are that an exporter runs in the same JVM as the broker, and intensive computation in an exporter will impact the throughput of the broker. You should do the minimal amount of processing possible in the exporter, and perform further transformation in another system after export.

Also, a badly-behaved exporter can cause broker disks to fill up. The event log is truncated only up to the earliest exporter position. If your exporter is loaded, and does not advance its position in the stream - whether due to a programming error or because of back pressure or latency from an external system - the broker log will not truncate and the broker disk can fill up.

You should plan for failure in connectivity to any external system and design the failure mode of your system.

Further Resources