Worked example

Custom side store and custom post-processor

This is a worked example showing how to extend Atoti Market Data with a custom market data type - Dividends. It shows how to define the new store, how to retrieve data from the store, and how to configure new measures. Finally we show you how use this new market data type in a DirectQuery application.

Step 1: Define the Custom Store

The custom store will contain the fields:

  • AsOfDate (LocalDate) - key
  • MarketDataSet (String) - key
  • InstrumentId (String) - key
  • ExecutionDate (LocalDate) - key
  • Quote (Double)
package com.activeviam.application.workedexample.store;

import static com.activeviam.database.api.types.ILiteralType.DOUBLE;
import static com.activeviam.database.api.types.ILiteralType.LOCAL_DATE;
import static com.activeviam.database.api.types.ILiteralType.STRING;

import java.time.LocalDate;

import org.springframework.context.annotation.Configuration;

import com.activeviam.tools.datastore.impl.AConfigurableSchema;

@Configuration
public class DividendReportingStoreConfig extends AConfigurableSchema {

    public static final String DIVIDEND_REPORTING_MARKET_DATA_STORE = "DividendReportingMarketDataStore";

    @Override
    public void createStores() {
        configurator.addStore(
                configurator.storeBuilder(GLOBAL)
                        .withStoreName(DIVIDEND_REPORTING_MARKET_DATA_STORE)
                        .withField("AsOfDate", LOCAL_DATE, LocalDate.MIN).asKeyField()
                        .withField("MarketDataSet", STRING).asKeyField()
                        .withField("InstrumentId", STRING).asKeyField()
                        .withField("ExecutionDate", LOCAL_DATE, LocalDate.MIN).asKeyField()
                        .withField("Quote", DOUBLE, 0.0)
                        .build()
        );
    }
}

We also need to import the configuration class that we have just created in our application config class for example, ApplicationConfig:

@Import(value = {
...
        // Added after all the other imports
        DividendReportingStoreConfig.class
})
public class ApplicationConfig {
    ...
}

Step 2: Retrieval Configuration & Post-Processor

In order to retrieve the data from the custom side store, we need to:

  • Create a retriever class and the associated configuration class.
  • Create a post-processor injection configuration so that the retriever can be used in our post-processor.
  • Implement the IDirectQueryCachingPostProcessor interface to use the cache.

Create the retrieval interface to be called in the post-processor which we will create later:

package com.activeviam.application.workedexample.postprocessors.retriever;

import java.time.LocalDate;

public interface IDividendsReportingMarketDataRetriever {

    Double getDividendsUntil(LocalDate asOfDate, String marketDataSet, String instrumentId, LocalDate executionDate) ;
}

Implement the retrieval interface in a class that extends SingleTableMarketDataRetriever:

package com.activeviam.application.workedexample.postprocessors.retriever;

import static com.activeviam.database.api.conditions.BaseConditions.and;
import static com.activeviam.database.api.conditions.BaseConditions.equal;
import static com.activeviam.database.api.conditions.BaseConditions.lessOrEqual;
import static com.activeviam.marketdata.lib.MarketDataConstants.AS_OF_DATE;
import static com.activeviam.marketdata.lib.MarketDataConstants.INSTRUMENT_ID;
import static com.activeviam.marketdata.lib.MarketDataConstants.MARKET_DATA_SET;
import static com.activeviam.marketdata.lib.MarketDataConstants.QUOTE;

import java.time.LocalDate;
import java.util.Map;

import com.activeviam.database.api.IDatabase;
import com.activeviam.database.api.query.IPreparedListQuery;
import com.activeviam.database.api.schema.FieldPath;
import com.activeviam.marketdata.lib.retrievers.impl.SingleTableMarketDataRetriever;

public class DividendsReportingTableMarketDataRetriever extends SingleTableMarketDataRetriever implements IDividendsReportingMarketDataRetriever {

    protected final IPreparedListQuery dividendsCompiledQuery;

    public static final String EXECUTION_DATE = "ExecutionDate";

    public DividendsReportingTableMarketDataRetriever(IDatabase database, String table) {
        super(database, table);
        dividendsCompiledQuery = database.getQueryManager()
                .listQuery()
                .forTable(table)
                .withCondition(and(
                        equal(FieldPath.of(AS_OF_DATE)).as(AS_OF_DATE),
                        equal(FieldPath.of(MARKET_DATA_SET)).as(MARKET_DATA_SET),
                        equal(FieldPath.of(INSTRUMENT_ID)).as(INSTRUMENT_ID),
                        lessOrEqual(FieldPath.of(EXECUTION_DATE))
                                .as(EXECUTION_DATE)))
                .withFieldsWithoutAlias(FieldPath.of(QUOTE))
                .compile();
    }

    @Override
    public Double getDividendsUntil(LocalDate asOfDate, String marketDataSet, String instrumentId, LocalDate executionDate) {
        var div = 0D;
        try (var cursor = database.getMasterHead()
                .getQueryRunner()
                .listQuery(dividendsCompiledQuery)
                .withParameters(Map.of(
                        AS_OF_DATE,
                        asOfDate,
                        MARKET_DATA_SET,
                        marketDataSet,
                        INSTRUMENT_ID,
                        instrumentId,
                        EXECUTION_DATE,
                        executionDate))
                .runCurrentThread()) {
            while (cursor.next()) {
                div += cursor.getRecord().readDouble(0);
            }
        }
        return div;
    }

}

Create the injection configuration class which will inject the IMarketDataRetrievalContainerService into our post-processor:

package com.activeviam.application.workedexample.postprocessors.retriever;

import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;

import com.activeviam.activepivot.core.intf.api.cube.IActivePivotManager;
import com.activeviam.activepivot.core.intf.api.postprocessing.IPostProcessor;
import com.activeviam.application.workedexample.postprocessors.DividendsReportingPostProcessor;
import com.activeviam.marketdata.lib.postprocessors.injection.IMarketDataRetrievalContainerServiceAware;
import com.activeviam.marketdata.lib.retrievers.intf.IDefaultMarketDataRetriever;
import com.activeviam.marketdata.lib.services.intf.IMarketDataRetrievalContainerService;
import com.activeviam.tech.core.api.registry.impl.ExtendedPluginInjector;

import lombok.AllArgsConstructor;

@Configuration
@ConditionalOnBean(IActivePivotManager.class)
@AllArgsConstructor
public class DividendReportingPostProcessorInjectionConfig {

    private final IMarketDataRetrievalContainerService<IDefaultMarketDataRetriever> containerService;

    @EventListener
    @Order(100)
    public void injectIntoSingleMarketDataPostProcessor(final ContextRefreshedEvent ignored) {
        ExtendedPluginInjector.inject(
                IPostProcessor.class,
                DividendsReportingPostProcessor.PLUGIN_KEY,
                IMarketDataRetrievalContainerServiceAware.PROPERTY_NAME,
                containerService
        );
    }
}

Create the retrieval configuration class:

package com.activeviam.application.workedexample.postprocessors.retriever;

import static com.activeviam.application.workedexample.store.DividendReportingStoreConfig.DIVIDEND_REPORTING_MARKET_DATA_STORE;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import com.activeviam.database.api.IDatabase;
import com.activeviam.marketdata.config.retrievers.ARetrievalConfig;
import com.activeviam.marketdata.lib.dates.impl.DateShiftingDateRetriever;
import com.activeviam.marketdata.lib.dates.impl.TableDateRetriever;
import com.activeviam.marketdata.lib.dates.intf.IContextualDateRetriever;
import com.activeviam.marketdata.lib.dates.intf.IDateRetriever;
import com.activeviam.marketdata.lib.retrievers.containers.impl.MarketDataRetrievalContainer;
import com.activeviam.marketdata.lib.retrievers.containers.intf.IMarketDataRetrievalContainer;
import com.activeviam.marketdata.lib.translators.impl.DateShiftTranslator;
import com.activeviam.marketdata.lib.translators.intf.IMarketDataCoordinateTranslator;

@Configuration
@Import(DividendReportingPostProcessorInjectionConfig.class)
public class DividendReportingMarketDataRetrievalConfig  {

    private static final String DIVIDEND_REPORTING_MARKET_DATA_RETRIEVER = "DividendReportingMarketDataRetriever";

    @Bean
    public IDateRetriever dividendReportingDateRetriever(IDatabase database) {
        return new TableDateRetriever(database, DIVIDEND_REPORTING_MARKET_DATA_STORE);
    }

    @Bean
    public IContextualDateRetriever dividendReportingDateShiftingDateRetriever(IDatabase database) {
        return new DateShiftingDateRetriever(dividendReportingDateRetriever(database));
    }

    @Bean
    public IMarketDataCoordinateTranslator<Object[]> dividendReportingDateShiftingCoordinateTranslator(IDatabase database) {
        return new DateShiftTranslator(database, DIVIDEND_REPORTING_MARKET_DATA_STORE, dividendReportingDateShiftingDateRetriever(database));
    }

    @Bean
    public DividendsReportingTableMarketDataRetriever dividendsMarketDataRetriever(IDatabase database) {
        return new DividendsReportingTableMarketDataRetriever(database, DIVIDEND_REPORTING_MARKET_DATA_STORE);
    }

    @Bean
    public IMarketDataRetrievalContainer<DividendsReportingTableMarketDataRetriever> dividendsRetrievalContainer(
            DividendsReportingTableMarketDataRetriever dividendsMarketDataRetriever,
            IMarketDataCoordinateTranslator<Object[]> dividendReportingDateShiftingCoordinateTranslator
    ) {
        return new MarketDataRetrievalContainer<>(
                DIVIDEND_REPORTING_MARKET_DATA_RETRIEVER,
                dividendsMarketDataRetriever,
                dividendReportingDateShiftingCoordinateTranslator
        );
    }
}

We now want to include the retrieval configuration we created in the ApplicationConfig class:

@Import(value = {
...
        // Added after all the other imports
        DividendReportingStoreConfig.class,
        DividendReportingMarketDataRetrievalConfig.class
})
@Slf4j
public class ApplicationConfig {
    ...
}

Create the postprocessor that will use the retriever we have configured.

Use the abstract class ADirectQueryCachingMarketDataPostProcessor which conforms to design patterns laid out by Atoti Market Data. Note that this will provide built-in support for DirectQuery caching.

In this example, we extend SingleMarketDataPostProcessor (which itself extends ADirectQueryCachingMarketDataPostProcessor).

In addition, we have an internal class which extends ADefaultContextualMarketDataRetriever. Market data retrieval implementations

package com.activeviam.application.workedexample.postprocessors;

import java.time.LocalDate;

import com.activeviam.activepivot.core.intf.api.location.ILocation;
import com.activeviam.activepivot.core.intf.api.postprocessing.IPostProcessor;
import com.activeviam.activepivot.core.intf.api.postprocessing.IPostProcessorCreationContext;
import com.activeviam.application.workedexample.postprocessors.retriever.IDividendsReportingMarketDataRetriever;
import com.activeviam.marketdata.lib.postprocessors.SingleMarketDataPostProcessor;
import com.activeviam.marketdata.lib.retrievers.contextual.impl.ADefaultContextualMarketDataRetriever;
import com.activeviam.marketdata.lib.retrievers.contextual.intf.IDefaultContextualMarketDataRetriever;
import com.activeviam.marketdata.lib.translators.intf.IMarketDataCoordinateTranslator;
import com.activeviam.tech.core.api.registry.AtotiExtendedPluginValue;

@AtotiExtendedPluginValue(intf = IPostProcessor.class, key = DividendsReportingPostProcessor.PLUGIN_KEY)
public class DividendsReportingPostProcessor extends SingleMarketDataPostProcessor {

    public static final String PLUGIN_KEY = "DividendsReportingPostProcessor";

    protected IDividendsReportingMarketDataRetriever dividendsReportingMarketDataRetriever;

    public DividendsReportingPostProcessor(String name, IPostProcessorCreationContext creationContext) {
        super(name, creationContext);
    }

    @Override protected IDefaultContextualMarketDataRetriever getMarketDataRetriever(ILocation location) {
        return new EquityDividendMarketDataRetriever(getMarketDataRetrievalContainer().name(),
                getMarketDataRetrievalContainer().coordinateTranslator());
    }

    @Override public String getType() {
        return PLUGIN_KEY;
    }

    public void setDividendsReportingMarketDataRetriever(
            IDividendsReportingMarketDataRetriever dividendsReportingMarketDataRetriever) {
        this.dividendsReportingMarketDataRetriever = dividendsReportingMarketDataRetriever;
    }

    protected class EquityDividendMarketDataRetriever extends ADefaultContextualMarketDataRetriever {
        public EquityDividendMarketDataRetriever(String name, IMarketDataCoordinateTranslator<Object[]> coordinateTranslator) {
            super(name, coordinateTranslator);
        }

        @Override
        public Double doGetMarketData(Object[] translatedCoordinates) {

            var asOfDate = (LocalDate) translatedCoordinates[0];
            var marketDataSet = (String) translatedCoordinates[1];
            var instrumentId = (String) translatedCoordinates[2];
            var executionDate = (LocalDate) translatedCoordinates[3];

            return dividendsReportingMarketDataRetriever.getDividendsUntil(asOfDate, marketDataSet, instrumentId, executionDate);
        }

    }
}

Step 3: the Measure configuration

Using Copper and MarketDataMeasureBuilderHelper create the measure in your measure configuration.

Read more about MarketDataMeasureBuilderHelper

LevelIdentifier asOfDateLevel = new LevelIdentifier("Dates", "Date", "AsOfDate");
LevelIdentifier marketDataSetLevel = new LevelIdentifier("MarketData", "MarketDataSets", "MarketDataSet");
LevelIdentifier curveIdLevel = new LevelIdentifier("InstrumentId", "InstrumentId", "InstrumentId");
LevelIdentifier tenorsLevel = new LevelIdentifier("ExecutionDate", "ExecutionDate", "ExecutionDate");

CopperMeasure copperMeasure = new MarketDataMeasureBuilderHelper<>(
        DividendsReportingPostProcessor.PLUGIN_KEY,
        DIVIDEND_REPORTING_MARKET_DATA_RETRIEVER)
        .startBuilding(List.of(asOfDateLevel, marketDataSetLevel, curveIdLevel, tenorsLevel), MarketDataDateShift.CURRENT_DAY)
        .as("DividendMeasure");

Usage in a DirectQuery application

Defining a DQ cache

In a DirectQuery application, defining a cache will allow us to retrieve the data efficiently.

In order to define the cache, we need to:

  • Specify the side store for which the caching is used (it will be the custom store we defined in our worked example).
  • Specify the partitioning fields (here we will use the AsOfDate and MarketDataSet fields).
  • Specify the capacity of the cache (we will set it to 6 partitions here as an example).

The postprocessor we created in the worked example extends ADirectQueryCachingMarketDataPostProcessor which implements IDirectQueryCachingPostProcessor, which means that it will automatically work with our configured cache.

We will create the cache description as a @Configuration class in the package com.activeviam.application.workedexample.dqcache:

Note: The cache size is being set to 6 just as an example, in a real project the size will depend on the specific needs of the project.

package com.activeviam.application.workedexample.dqcache;

import static com.activeviam.application.workedexample.store.DividendReportingStoreConfig.DIVIDEND_REPORTING_MARKET_DATA_STORE;
import static com.activeviam.marketdata.lib.MarketDataConstants.AS_OF_DATE;
import static com.activeviam.marketdata.lib.MarketDataConstants.MARKET_DATA_SET;

import java.util.List;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.activeviam.directquery.api.cache.CacheCapacity;
import com.activeviam.directquery.api.cache.SingleTableCacheDescription;

@Configuration
public class DividendReportingStoreCacheConfig {

    // Provide a custom cache description for the custom side store
    @Bean
    public SingleTableCacheDescription customSideStoreCacheDescription() {
        return SingleTableCacheDescription.builder()
                .tableName(DIVIDEND_REPORTING_MARKET_DATA_STORE)
                .cachePartitioningFields(List.of(AS_OF_DATE, MARKET_DATA_SET))
                .capacity(CacheCapacity.MaxPartitionsCount.of(6))
                .build();
    }
}

We also need to import the configuration class that we have just created in the ApplicationConfig class:

@Import(value = {
...
        // Added after all the other imports
        DividendReportingStoreConfig.class,
        DividendReportingMarketDataRetrievalConfig.class
        DividendReportingStoreCacheConfig.class
})
@Slf4j
public class ApplicationConfig {
    ...
}

Populating the database table

Here we take the example of a Databricks database in which we want to create our custom table and populate some sample data in it. We use the following SQL script:

CREATE OR REPLACE TABLE DividendReportingMarketDataStore (
    AsOfDate DATE NOT NULL,
    MarketDataSet STRING NOT NULL,
    InstrumentId STRING NOT NULL,
    ExecutionDate DATE NOT NULL,
    Quote DOUBLE NOT NULL,
    PRIMARY KEY (AsOfDate, MarketDataSet, InstrumentId, ExecutionDate)
);

INSERT INTO DividendReportingMarketDataStore (AsOfDate, MarketDataSet, InstrumentId, ExecutionDate, Quote)
VALUES
    ('2018-09-28', 'MarketDataSet1', 'Stock1', '2024-01-01', 1.0),
    ('2018-09-28', 'MarketDataSet1', 'Stock1', '2024-02-01', 2.0),
    ('2018-09-28', 'MarketDataSet1', 'Stock1', '2024-03-01', 3.0);

The way you populate your database may be specific to your project, so the actions you need to take to create and populate the custom table in the database may vary compared to this example.