Configuring sources using Spring Beans

This page describes the extension points that allow you to customize the CSV sources through Spring Beans.

It also provides an example of adding customization using the new extension points.

Concept

A source topic is configured through two parameter objects.

Parameter object Fields Details
CsvTopicColumns String topic

List<String> columns

Map<String, List<IColumnCalculator<ILineReader>>> calculators
For a given topic, this object contains the list of columns expected in the file, and a list of column calculators for each store targeted by the source.
CsvTopicPublisher<I> String topic

ITuplePublisher<IFileInfo<I>> publisher
For a given topic, this object contains the publisher that will be used by the source.

Customization Beans are defined as generator Function objects that accept the map of previously defined parameter objects and generate a list of new parameter objects. These function Beans are then used in the creation of provider objects that respect the order in which the Beans are defined. The provider objects apply the Functions on a copy of the internal Map of previously defined parameters, to prevent direct operations on the Map.

The internal source configuration uses the same Bean mechanism, and we advise using the @Order annotation for custom Beans, with values higher than 10.

The Provider objects that the generator Beans are wired into are also exposed as Beans. While the default behavior should suffice for most customization requirements, you can replace a particular Provider object by creating a qualified bean of the same type and marking it as @Primary.

Configuration Bean types

The following Bean types are available:

Bean type Details
CsvColumnsProviderFunction Alias for a BiFunction accepting the Data Load Controller ISourceConfiguration and the previously defined CsvTopicColumns object per topic, and generating a List of new CsvTopicColumns objects.
CsvPublisherProviderFunction Alias for a BiFunction accepting the Data Load Controller ISourceConfiguration and the previously defined CsvTopicPublisher object per topic, and generating a List of new CsvTopicPublisher objects.
CsvColumnsProvider A provider of CsvTopicColumns objects, retrievable per topic or as a collection.
CsvPublisherProvider<I> A provider of CsvTopicPublisher objects, retrievable per topic.

Available Qualifiers

The customization Beans and the Provider objects they are used in, are annotated with a @Qualifier, explicitly defining the source to which the customization applies. All @Qualifier constants are defined in the SpringConstants class.

Constant Qualifier Type Source
SP_QUALIFIER__SENSI_TOPIC_COLUMNS “sensi-topic-columns” CsvColumnsProviderFunction Sensitivity
SP_QUALIFIER__DQ_SENSI_TOPIC_COLUMNS “dq-sensi-topic-columns” Columns generator Function Sensi for DirectQuery
SP_QUALIFIER__SENSI_SUMMARY_TOPIC_COLUMNS “sensi-summary-topic-columns” CsvColumnsProviderFunction Sensitivity Summary
SP_QUALIFIER__VAR_TOPIC_COLUMNS “var-topic-columns” CsvColumnsProviderFunction VaR
SP_QUALIFIER__VAR_SUMMARY_TOPIC_COLUMNS “var-summary-topic-columns” CsvColumnsProviderFunction VaR Summary
SP_QUALIFIER__PNL_TOPIC_COLUMNS “pnl-topic-columns” CsvColumnsProviderFunction PnL
SP_QUALIFIER__PNL_SUMMARY_TOPIC_COLUMNS “pnl-summary-topic-columns” CsvColumnsProviderFunction PnL Summary
SP_QUALIFIER__ADJUSTMENTS_TOPIC_COLUMNS “adjustments-topic-columns” CsvColumnsProviderFunction Adjustments
SP_QUALIFIER__COMMON_TOPIC_COLUMNS “common-topic-columns” CsvColumnsProviderFunction Common
SP_QUALIFIER__SENSI_TOPIC_PUBLISHERS “sensi-topic-publishers” CsvPublisherProviderFunction Sensitivity
SP_QUALIFIER__DQ_SENSI_TOPIC_PUBLISHERS “dq-sensi-topic-publishers” Publisher generator Function Sensi for DirectQuery
SP_QUALIFIER__SENSI_SUMMARY_TOPIC_PUBLISHERS “sensi-summary-topic-publishers” CsvPublisherProviderFunction Sensitivity Summary
SP_QUALIFIER__VAR_TOPIC_PUBLISHERS “var-topic-publishers” CsvPublisherProviderFunction VaR
SP_QUALIFIER__VAR_SUMMARY_TOPIC_PUBLISHERS “var-summary-topic-publishers” CsvPublisherProviderFunction VaR Summary
SP_QUALIFIER__PNL_TOPIC_PUBLISHERS “pnl-topic-publishers” CsvPublisherProviderFunction PnL
SP_QUALIFIER__PNL_SUMMARY_TOPIC_PUBLISHERS “pnl-summary-topic-publishers” CsvPublisherProviderFunction PnL Summary
SP_QUALIFIER__ADJUSTMENTS_TOPIC_PUBLISHERS “adjustments-topic-publishers” CsvPublisherProviderFunction Adjustments
SP_QUALIFIER__COMMON_TOPIC_PUBLISHERS “common-topic-publishers” CsvPublisherProviderFunction Common
SP_QUALIFIER__SENSI_TOPIC_COLUMNS_PROVIDER “sensi-topic-columns-provider” CsvColumnsProvider Sensitivity
SP_QUALIFIER__SENSI_SUMMARY_TOPIC_COLUMNS_PROVIDER “sensi-summary-topic-columns-provider” CsvColumnsProvider Sensitivity Summary
SP_QUALIFIER__VAR_TOPIC_COLUMNS_PROVIDER “var-topic-columns-provider” CsvColumnsProvider VaR
SP_QUALIFIER__VAR_SUMMARY_TOPIC_COLUMNS_PROVIDER “var-summary-topic-columns-provider” CsvColumnsProvider VaR Summary
SP_QUALIFIER__PNL_TOPIC_COLUMNS_PROVIDER “pnl-topic-columns-provider” CsvColumnsProvider PnL
SP_QUALIFIER__PNL_SUMMARY_TOPIC_COLUMNS_PROVIDER “pnl-summary-topic-columns-provider” CsvColumnsProvider PnL Summary
SP_QUALIFIER__ADJUSTMENTS_TOPIC_COLUMNS_PROVIDER “adjustments-topic-columns-provider” CsvColumnsProvider Adjustments
SP_QUALIFIER__COMMON_TOPIC_COLUMNS_PROVIDER “common-topic-columns-provider” CsvColumnsProvider Common
SP_QUALIFIER__SENSI_TOPIC_PUBLISHERS_PROVIDER “sensi-topic-publishers-provider” CsvPublisherProvider Sensitivity
SP_QUALIFIER__SENSI_SUMMARY_TOPIC_PUBLISHERS_PROVIDER “sensi-summary-topic-publishers-provider” CsvPublisherProvider Sensitivity Summary
SP_QUALIFIER__VAR_TOPIC_PUBLISHERS_PROVIDER “var-topic-publishers-provider” CsvPublisherProvider VaR
SP_QUALIFIER__VAR_SUMMARY_TOPIC_PUBLISHERS_PROVIDER “var-summary-topic-publishers-provider” CsvPublisherProvider VaR Summary
SP_QUALIFIER__PNL_TOPIC_PUBLISHERS_PROVIDER “pnl-topic-publishers-provider” CsvPublisherProvider PnL
SP_QUALIFIER__PNL_SUMMARY_TOPIC_PUBLISHERS_PROVIDER “pnl-summary-topic-publishers-provider” CsvPublisherProvider PnL Summary
SP_QUALIFIER__ADJUSTMENTS_TOPIC_PUBLISHERS_PROVIDER “adjustments-topic-publishers-provider” CsvPublisherProvider Adjustments
SP_QUALIFIER__COMMON_TOPIC_PUBLISHERS_PROVIDER “common-topic-publishers-provider” CsvPublisherProvider Common

Example

For the purposes of this example, we will add a column to the file DynamicTenors.csv and load the data from this column into the DynamicTenors store. In this case, our new column’s header is ‘TestField’, and all data in this column is ‘Testdata’. Below are the intended contents of the input file.

TenorLabels NumberOfDays SensitivityName TenorSet TestField
N/A 0 SET_A Testdata
0.25Y 90 SET_A Testdata
0.5Y 180 SET_A Testdata
1Y 360 SET_A Testdata
2Y 720 SET_A Testdata
3Y 1080 SET_A Testdata
4Y 1440 SET_A Testdata
5Y 1800 SET_A Testdata
10Y 3600 SET_A Testdata
15Y 5400 SET_A Testdata
20Y 7200 SET_A Testdata
30Y 10080 SET_A Testdata

We will also attach a column calculator that multiplies the ‘NumberOfDays’ by 2 into a new column, as well as a tuple publisher which will add a value into the ‘SensitivityName’ field.

Step 1 - Define customizations to datastore

Before we can load these new columns into our cube, we need to make sure that our datastore has fields that can accept them.

To add new fields to an existing store, we will need to create a DatastoreConfiguratorConsumer bean which appends the required fields to the DynamicTenors store:

    @Bean
    public DatastoreConfiguratorConsumer addFieldToStore() {
        return configurator -> {
            configurator.appendFields(SensiDatastoreDescriptionConfig.SCHEMA, StoreConstants.DYNAMIC_TENOR_STORE_NAME,
                    List.of(
                            new CustomField(DOUBLE_NUMBER_OF_DAYS, ILiteralType.DOUBLE),
                            new CustomField(TEST_FIELD, ILiteralType.STRING)
                    )
            );
        };
    }

See Customizing the Datastore with the Datastore Helper for more information.

Step 2 - Configuring the source

Now that we have modified our store, we need to make sure the ETL is correctly set up so that the field will be properly populated.

In most cases, the topic configuration will use the store fields as the expected file columns, which would require no further configuration.

For the DynamicTenors store used in this example, file columns are defined explicitly, and thus any new columns are not autoconfigured. The DynamicTenors store also has a previously defined column calculator creating indices for each of the tenors.

To add the new columns and the column calculator, we will create a bean with the signature

@Qualifier(SP_QUALIFIER__SENSI_TOPIC_COLUMNS)
@Order(10)
public CsvColumnsProviderFunction<I> addFieldAndCalculator()

containing the required logic:

    @Bean
    @Qualifier(SP_QUALIFIER__SENSI_TOPIC_COLUMNS)
    @Order(10)
    public CsvColumnsProviderFunction<I> addFieldAndCalculator() {
        return (source, previousConfig) -> {
            // We retrieve the previous columns of the topic for the Tenors store.
            CsvTopicColumns prevColumns = previousConfig.get(StoreConstants.DYNAMIC_TENOR_STORE_NAME);
            List<String> newColumns = new ArrayList<>(prevColumns.getColumns());
            newColumns.add(TEST_FIELD);

            // We retrieve the previous calculators defined for the store.
            // While in this instance the topic name and the store are the same, calculators can be defined for multiple stores within a topic.
            Map<String, List<IColumnCalculator<ILineReader>>> topicCalculators = new HashMap<>(prevColumns.getCalculators());
            List<IColumnCalculator<ILineReader>> storeCalculators = new ArrayList<>(topicCalculators.get(StoreConstants.DYNAMIC_TENOR_STORE_NAME));
            storeCalculators.add(new IColumnCalculator<>() {
                @Override public String getColumnName() {
                    return DOUBLE_NUMBER_OF_DAYS;
                }

                @Override public Object compute(IColumnCalculationContext<ILineReader> context) {
                    return ((Double) context.getValue(StoreFieldNames.TENOR_NUMBER_OF_DAYS)) * 2.0;
                }
            });
            topicCalculators.put(StoreConstants.DYNAMIC_TENOR_STORE_NAME, storeCalculators);

            // We create a new CsvTopicColumns object with the new list of columns and the previously defined column calculators.
            return List.of(new CsvTopicColumns(
                    StoreConstants.DYNAMIC_TENOR_STORE_NAME,
                    newColumns,
                    topicCalculators
            ));
        };
    }

For the tuple publisher, a second bean is created with the signature

@Qualifier(SP_QUALIFIER__SENSI_TOPIC_PUBLISHERS)
    @Order(10)
    public CsvColumnsProviderFunction<I> addPublisher()

defining the publishing logic:

    @Bean
    @Qualifier(SP_QUALIFIER__SENSI_TOPIC_PUBLISHERS)
    @Order(10)
    public CsvColumnsProviderFunction<I> addPublisher(IDatastore datastore) {
        return (source, previousConfig) -> List.of(
                new CsvTopicPublisher<>(
                        StoreConstants.DYNAMIC_TENOR_STORE_NAME,
                        new ITuplePublisher<IFileInfo<Path>>() {
                            @Override
                            public void publish(IStoreMessage<? extends IFileInfo<Path>, ?> message, List<Object[]> tuples) {
                                // We use the expected index for the SensitivityName field here, while in a complete implementation the index would be an instance variable.
                                tuples.forEach(tuple -> tuple[1] = "TestValue");
                                datastore.getTransactionManager().addAll(StoreConstants.DYNAMIC_TENOR_STORE_NAME, tuples);
                            }

                            @Override
                            public Collection<String> getTargetStores() {
                                return Collections.singleton(StoreConstants.DYNAMIC_TENOR_STORE_NAME);
                            }
                        }
                )
        );
    }

Step 3 - Configuration classes

The Beans detailed above can be included in a single class within the MR application.

@Configuration
public class CustomizationsConfig {
    private static final String TEST_FIELD = "TestData";
    private static final String DOUBLE_NUMBER_OF_DAYS = "DoubleNumberOfDays";

    @Bean
    public DatastoreConfiguratorConsumer addFieldToStore() {
        return configurator -> {
            configurator.appendFields(SensiDatastoreDescriptionConfig.SCHEMA, StoreConstants.DYNAMIC_TENOR_STORE_NAME,
                    List.of(
                            new CustomField(DOUBLE_NUMBER_OF_DAYS, ILiteralType.DOUBLE),
                            new CustomField(TEST_FIELD, ILiteralType.STRING)
                    )
            );
        };
    }

    @Bean
    @Qualifier(SP_QUALIFIER__SENSI_TOPIC_COLUMNS)
    @Order(10)
    public <I> CsvColumnsProviderFunction<I> addFieldAndCalculator() {
        return (source, previousConfig) -> {
            // We retrieve the previous columns of the topic for the Tenors store.
            CsvTopicColumns prevColumns = previousConfig.get(StoreNames.TENOR_STORE_NAME);
            List<String> newColumns = new ArrayList<>(prevColumns.getColumns());
            newColumns.add(TEST_FIELD);

            // We retrieve the previous calculators defined for the store. The Tenors store has a column calculator setting indices for each tenor.
            // While in this instance the topic name and the store are the same, calculators can be defined for multiple stores within a topic.
            Map<String, List<IColumnCalculator<ILineReader>>> topicCalculators = new HashMap<>(prevColumns.getCalculators());
            List<IColumnCalculator<ILineReader>> storeCalculators = new ArrayList<>(topicCalculators.get(StoreNames.TENOR_STORE_NAME));
            storeCalculators.add(new IColumnCalculator<>() {
                @Override public String getColumnName() {
                    return DOUBLE_NUMBER_OF_DAYS;
                }

                @Override public Object compute(IColumnCalculationContext<ILineReader> context) {
                    return ((Double) context.getValue(StoreFieldNames.TENOR_NUMBER_OF_DAYS)) * 2.0;
                }
            });
            topicCalculators.put(StoreNames.TENOR_STORE_NAME, storeCalculators);

            // We create a new CsvTopicColumns object with the new list of columns and the previously defined column calculators.
            return List.of(new CsvTopicColumns(
                    StoreNames.TENOR_STORE_NAME,
                    newColumns,
                    topicCalculators
            ));
        };
    }

    @Bean
    @Qualifier(SP_QUALIFIER__SENSI_TOPIC_PUBLISHERS)
    @Order(10)
    public <I> CsvPublisherProviderFunction<I> addPublisher(IDatastore datastore) {
        return (source, previousConfig) -> List.of(
                new CsvTopicPublisher<>(
                        StoreNames.TENOR_STORE_NAME,
                        new ITuplePublisher<IFileInfo<Path>>() {
                            @Override
                            public void publish(IStoreMessage<? extends IFileInfo<Path>, ?> message, List<Object[]> tuples) {
                                // We use the expected index for the SensitivityName field here, while in a complete implementation the index would be an instance variable.
                                tuples.forEach(tuple -> tuple[1] = "TestValue");
                                datastore.getTransactionManager().addAll(StoreNames.TENOR_STORE_NAME, tuples);
                            }

                            @Override
                            public Collection<String> getTargetStores() {
                                return Collections.singleton(StoreNames.TENOR_STORE_NAME);
                            }
                        }
                )
        );
    }
}

This configuration class should then be included in the @Import annotation of the MarketRiskConfig class:

@Import(value = {
        ...
		CustomizationsConfig.class

})
public class MarketRiskConfig {
    ...
}

Importing this customization class and modifying the input file will result in the following data being loaded into the store:

TenorLabels SensitivityName NumberOfDays TenorIndices DoubleNumberOfDays TestData
N/A TestValue 0.0 0 0.0 Testdata
0.25Y TestValue 90.0 1 180.0 Testdata
0.5Y TestValue 180.0 2 360.0 Testdata
1Y TestValue 360.0 3 720.0 Testdata
2Y TestValue 720.0 4 1440.0 Testdata
3Y TestValue 1080.0 5 2160.0 Testdata
4Y TestValue 1440.0 6 2880.0 Testdata
5Y TestValue 1800.0 7 3600.0 Testdata
10Y TestValue 3600.0 8 7200.0 Testdata
15Y TestValue 5400.0 9 10800.0 Testdata
20Y TestValue 7200.0 10 14400.0 Testdata
30Y TestValue 10080.0 11 20160.0 Testdata

Helper methods

To simplify the customization process for less complex use-cases, a set of helper methods are included in the Provider classes.

All methods are static and return an object that can be used directly in the customization Bean. The generated List will contain a single parameter object containing the defined customizations.

Method Details Provider
addColumnsFunction(String topic, List<String> columns) Adds a list of columns to a topic. Preserves the previously defined calculators. CsvColumnsProvider
addCalculatorsFunction(String topic, List<IColumnCalculator<ILineReader>> calculators) Adds a list of calculators to a topic, using the topic name as the store name. Preserves the previously defined columns. CsvColumnsProvider
addCalculatorsFunction(String topic, Map<String, List<IColumnCalculator<ILineReader>>> calculators) Adds a map of calculators per store name to a topic. Preserves the previously defined columns. CsvColumnsProvider
addColumnsAndCalcsFunction(String topic, List<String> columns, List<IColumnCalculator<ILineReader>> calculators) Adds a list of columns and a list of calculators to a topic, using the topic name as the store name. CsvColumnsProvider
addColumnsAndCalcsFunction(String topic, List<String> columns, Map<String, List<IColumnCalculator<ILineReader>>> calculators) Adds a list of columns and a map of calculators per store name to a topic. CsvColumnsProvider
addPublisher(String topic, ITuplePublisher<IFileInfo> publisher) Adds a tuple publisher to a topic. CsvPublisherProvider

The ATableFormatTuplePublisher

For use cases where the destination table doesn’t match the input file format, we provide the ATableFormatTuplePublisher.

Any extension of this publisher can be constructed with the file tuple containing a subset of the table fields, or with an explicit mapping between table fields and file columns.

Example

For the backwards-compatible Market Data API sources, the stores should be loaded from the deprecated file format. In the case of the SpotMarketData, when loading from the MarketData.csv file, the following applies:

File Field Table Field Behavior
AsOfDate AsOfDate Loaded as-is
MarketDataSet MarketDataSet Loaded as-is
RiskFactorId InstrumentId Mapped by the publisher
Quote Quote Loaded as-is
TenorLabels - Must be empty
MaturityLabels - Must be empty
MoneynessLabels - Must be empty
TenorDates - Must be empty
MaturityDates - Must be empty
Nominal - Must be empty

The configuration for this behavior is:

new ATableFormatTuplePublisher<>(datastore, Map.of(INSTRUMENT_ID, RISK_FACTOR)) {
    @Override
    public void publish(IStoreMessage<? extends IFileInfo<Path>, ?> message, List<Object[]> tuples) {
        Queue<Object[]> storeTuples = new ConcurrentLinkedQueue<>();
        tuples.forEach(
                tuple -> {
                    if (checkNull(tmmProps, message, tuple, TENOR_LABELS, TENOR_DATES, MATURITY_LABELS, MATURITY_DATES, MONEYNESS_LABELS,
                            NOMINAL)) {
                        Queue<Object[]> extractedScalarTuples = new ConcurrentLinkedQueue<>();
                        devectorizer.extractScalarTuples(message, tuple, extractedScalarTuples);
                        extractedScalarTuples.forEach(
                                scalarTuple -> storeTuples.add(createTupleForTable(message, SPOT_MARKET_DATA_STORE, scalarTuple))
                        );
                    }
                }
        );
        datastore.getTransactionManager().addAll(SPOT_MARKET_DATA_STORE, storeTuples);
    }

    @Override
    public Collection<String> getTargetStores() {
        return Set.of(SPOT_MARKET_DATA_STORE);
    }
}

Within the createTupleForTable method, the publisher will iterate over the table fields, attempting to retrieve a value in the input tuple by the mapped field (or directly if no mapping is available).

If no value is available in the input tuple (or no matching field is found), the value will be empty. If a tuple field’s value isn’t requested by the publisher, it will be ignored.

Suggested Further Reading