Configuring sources using Spring Beans
This page describes the extension points that allow you to customize the CSV sources through Spring Beans.
It also provides an example of adding customization using the new extension points.
Concept
A source topic is configured through two parameter objects.
Parameter object | Fields | Details |
---|---|---|
CsvTopicColumns |
String topic List<String> columns Map<String, List<IColumnCalculator<ILineReader>>> calculators |
For a given topic, this object contains the list of columns expected in the file, and a list of column calculators for each store targeted by the source. |
CsvTopicPublisher<I> |
String topic ITuplePublisher<IFileInfo<I>> publisher |
For a given topic, this object contains the publisher that will be used by the source. |
Customization Beans
are defined as generator Function
objects that accept the map of previously defined parameter objects and generate a list of new parameter objects.
These function Beans
are then used in the creation of provider objects that respect the order in which the Beans
are defined.
The provider objects apply the Functions
on a copy of the internal Map
of previously defined parameters, to prevent direct operations on the Map
.
The internal source configuration uses the same Bean
mechanism, and we advise using the @Order
annotation for custom Beans
, with values higher than 10.
The Provider
objects that the generator Beans
are wired into are also exposed as Beans
.
While the default behavior should suffice for most customization requirements, you can replace a particular Provider
object by creating a qualified bean of the same type and marking it as @Primary
.
Configuration Bean types
The following Bean types are available:
Bean type | Details |
---|---|
CsvColumnsProviderFunction |
Alias for a BiFunction accepting the Data Load Controller ISourceConfiguration and the previously defined CsvTopicColumns object per topic, and generating a List of new CsvTopicColumns objects. |
CsvPublisherProviderFunction |
Alias for a BiFunction accepting the Data Load Controller ISourceConfiguration and the previously defined CsvTopicPublisher object per topic, and generating a List of new CsvTopicPublisher objects. |
CsvColumnsProvider |
A provider of CsvTopicColumns objects, retrievable per topic or as a collection. |
CsvPublisherProvider<I> |
A provider of CsvTopicPublisher objects, retrievable per topic. |
Available Qualifiers
The customization Beans
and the Provider
objects they are used in, are annotated with a @Qualifier
, explicitly defining the source to which the customization applies.
All @Qualifier
constants are defined in the SpringConstants
class.
Constant | Qualifier | Type | Source |
---|---|---|---|
SP_QUALIFIER__SENSI_TOPIC_COLUMNS | “sensi-topic-columns” | CsvColumnsProviderFunction |
Sensitivity |
SP_QUALIFIER__DQ_SENSI_TOPIC_COLUMNS | “dq-sensi-topic-columns” | Columns generator Function |
Sensi for DirectQuery |
SP_QUALIFIER__SENSI_SUMMARY_TOPIC_COLUMNS | “sensi-summary-topic-columns” | CsvColumnsProviderFunction |
Sensitivity Summary |
SP_QUALIFIER__VAR_TOPIC_COLUMNS | “var-topic-columns” | CsvColumnsProviderFunction |
VaR |
SP_QUALIFIER__VAR_SUMMARY_TOPIC_COLUMNS | “var-summary-topic-columns” | CsvColumnsProviderFunction |
VaR Summary |
SP_QUALIFIER__PNL_TOPIC_COLUMNS | “pnl-topic-columns” | CsvColumnsProviderFunction |
PnL |
SP_QUALIFIER__PNL_SUMMARY_TOPIC_COLUMNS | “pnl-summary-topic-columns” | CsvColumnsProviderFunction |
PnL Summary |
SP_QUALIFIER__ADJUSTMENTS_TOPIC_COLUMNS | “adjustments-topic-columns” | CsvColumnsProviderFunction |
Adjustments |
SP_QUALIFIER__COMMON_TOPIC_COLUMNS | “common-topic-columns” | CsvColumnsProviderFunction |
Common |
SP_QUALIFIER__SENSI_TOPIC_PUBLISHERS | “sensi-topic-publishers” | CsvPublisherProviderFunction |
Sensitivity |
SP_QUALIFIER__DQ_SENSI_TOPIC_PUBLISHERS | “dq-sensi-topic-publishers” | Publisher generator Function |
Sensi for DirectQuery |
SP_QUALIFIER__SENSI_SUMMARY_TOPIC_PUBLISHERS | “sensi-summary-topic-publishers” | CsvPublisherProviderFunction |
Sensitivity Summary |
SP_QUALIFIER__VAR_TOPIC_PUBLISHERS | “var-topic-publishers” | CsvPublisherProviderFunction |
VaR |
SP_QUALIFIER__VAR_SUMMARY_TOPIC_PUBLISHERS | “var-summary-topic-publishers” | CsvPublisherProviderFunction |
VaR Summary |
SP_QUALIFIER__PNL_TOPIC_PUBLISHERS | “pnl-topic-publishers” | CsvPublisherProviderFunction |
PnL |
SP_QUALIFIER__PNL_SUMMARY_TOPIC_PUBLISHERS | “pnl-summary-topic-publishers” | CsvPublisherProviderFunction |
PnL Summary |
SP_QUALIFIER__ADJUSTMENTS_TOPIC_PUBLISHERS | “adjustments-topic-publishers” | CsvPublisherProviderFunction |
Adjustments |
SP_QUALIFIER__COMMON_TOPIC_PUBLISHERS | “common-topic-publishers” | CsvPublisherProviderFunction |
Common |
SP_QUALIFIER__SENSI_TOPIC_COLUMNS_PROVIDER | “sensi-topic-columns-provider” | CsvColumnsProvider |
Sensitivity |
SP_QUALIFIER__SENSI_SUMMARY_TOPIC_COLUMNS_PROVIDER | “sensi-summary-topic-columns-provider” | CsvColumnsProvider |
Sensitivity Summary |
SP_QUALIFIER__VAR_TOPIC_COLUMNS_PROVIDER | “var-topic-columns-provider” | CsvColumnsProvider |
VaR |
SP_QUALIFIER__VAR_SUMMARY_TOPIC_COLUMNS_PROVIDER | “var-summary-topic-columns-provider” | CsvColumnsProvider |
VaR Summary |
SP_QUALIFIER__PNL_TOPIC_COLUMNS_PROVIDER | “pnl-topic-columns-provider” | CsvColumnsProvider |
PnL |
SP_QUALIFIER__PNL_SUMMARY_TOPIC_COLUMNS_PROVIDER | “pnl-summary-topic-columns-provider” | CsvColumnsProvider |
PnL Summary |
SP_QUALIFIER__ADJUSTMENTS_TOPIC_COLUMNS_PROVIDER | “adjustments-topic-columns-provider” | CsvColumnsProvider |
Adjustments |
SP_QUALIFIER__COMMON_TOPIC_COLUMNS_PROVIDER | “common-topic-columns-provider” | CsvColumnsProvider |
Common |
SP_QUALIFIER__SENSI_TOPIC_PUBLISHERS_PROVIDER | “sensi-topic-publishers-provider” | CsvPublisherProvider |
Sensitivity |
SP_QUALIFIER__SENSI_SUMMARY_TOPIC_PUBLISHERS_PROVIDER | “sensi-summary-topic-publishers-provider” | CsvPublisherProvider |
Sensitivity Summary |
SP_QUALIFIER__VAR_TOPIC_PUBLISHERS_PROVIDER | “var-topic-publishers-provider” | CsvPublisherProvider |
VaR |
SP_QUALIFIER__VAR_SUMMARY_TOPIC_PUBLISHERS_PROVIDER | “var-summary-topic-publishers-provider” | CsvPublisherProvider |
VaR Summary |
SP_QUALIFIER__PNL_TOPIC_PUBLISHERS_PROVIDER | “pnl-topic-publishers-provider” | CsvPublisherProvider |
PnL |
SP_QUALIFIER__PNL_SUMMARY_TOPIC_PUBLISHERS_PROVIDER | “pnl-summary-topic-publishers-provider” | CsvPublisherProvider |
PnL Summary |
SP_QUALIFIER__ADJUSTMENTS_TOPIC_PUBLISHERS_PROVIDER | “adjustments-topic-publishers-provider” | CsvPublisherProvider |
Adjustments |
SP_QUALIFIER__COMMON_TOPIC_PUBLISHERS_PROVIDER | “common-topic-publishers-provider” | CsvPublisherProvider |
Common |
Example
For the purposes of this example, we will add a column to the file
DynamicTenors.csv
and load the data from this column into the
DynamicTenors
store. In this case, our new column’s header is ‘TestField’,
and all data in this column is ‘Testdata’. Below are the intended contents of the input file.
TenorLabels | NumberOfDays | SensitivityName | TenorSet | TestField |
---|---|---|---|---|
N/A | 0 | SET_A | Testdata | |
0.25Y | 90 | SET_A | Testdata | |
0.5Y | 180 | SET_A | Testdata | |
1Y | 360 | SET_A | Testdata | |
2Y | 720 | SET_A | Testdata | |
3Y | 1080 | SET_A | Testdata | |
4Y | 1440 | SET_A | Testdata | |
5Y | 1800 | SET_A | Testdata | |
10Y | 3600 | SET_A | Testdata | |
15Y | 5400 | SET_A | Testdata | |
20Y | 7200 | SET_A | Testdata | |
30Y | 10080 | SET_A | Testdata |
We will also attach a column calculator that multiplies the ‘NumberOfDays’ by 2 into a new column, as well as a tuple publisher which will add a value into the ‘SensitivityName’ field.
Step 1 - Define customizations to datastore
Before we can load these new columns into our cube, we need to make sure that our datastore has fields that can accept them.
To add new fields to an existing store, we will need to create a DatastoreConfiguratorConsumer
bean which appends the required fields to the DynamicTenors
store:
@Bean
public DatastoreConfiguratorConsumer addFieldToStore() {
return configurator -> {
configurator.appendFields(SensiDatastoreDescriptionConfig.SCHEMA, StoreConstants.DYNAMIC_TENOR_STORE_NAME,
List.of(
new CustomField(DOUBLE_NUMBER_OF_DAYS, ILiteralType.DOUBLE),
new CustomField(TEST_FIELD, ILiteralType.STRING)
)
);
};
}
See Customizing the Datastore with the Datastore Helper for more information.
Step 2 - Configuring the source
Now that we have modified our store, we need to make sure the ETL is correctly set up so that the field will be properly populated.
In most cases, the topic configuration will use the store fields as the expected file columns, which would require no further configuration.
For the DynamicTenors
store used in this example, file columns are defined explicitly, and thus any new columns are not autoconfigured.
The DynamicTenors store also has a previously defined column calculator creating indices for each of the tenors.
To add the new columns and the column calculator, we will create a bean with the signature
@Qualifier(SP_QUALIFIER__SENSI_TOPIC_COLUMNS)
@Order(10)
public CsvColumnsProviderFunction<I> addFieldAndCalculator()
containing the required logic:
@Bean
@Qualifier(SP_QUALIFIER__SENSI_TOPIC_COLUMNS)
@Order(10)
public CsvColumnsProviderFunction<I> addFieldAndCalculator() {
return (source, previousConfig) -> {
// We retrieve the previous columns of the topic for the Tenors store.
CsvTopicColumns prevColumns = previousConfig.get(StoreConstants.DYNAMIC_TENOR_STORE_NAME);
List<String> newColumns = new ArrayList<>(prevColumns.getColumns());
newColumns.add(TEST_FIELD);
// We retrieve the previous calculators defined for the store.
// While in this instance the topic name and the store are the same, calculators can be defined for multiple stores within a topic.
Map<String, List<IColumnCalculator<ILineReader>>> topicCalculators = new HashMap<>(prevColumns.getCalculators());
List<IColumnCalculator<ILineReader>> storeCalculators = new ArrayList<>(topicCalculators.get(StoreConstants.DYNAMIC_TENOR_STORE_NAME));
storeCalculators.add(new IColumnCalculator<>() {
@Override public String getColumnName() {
return DOUBLE_NUMBER_OF_DAYS;
}
@Override public Object compute(IColumnCalculationContext<ILineReader> context) {
return ((Double) context.getValue(StoreFieldNames.TENOR_NUMBER_OF_DAYS)) * 2.0;
}
});
topicCalculators.put(StoreConstants.DYNAMIC_TENOR_STORE_NAME, storeCalculators);
// We create a new CsvTopicColumns object with the new list of columns and the previously defined column calculators.
return List.of(new CsvTopicColumns(
StoreConstants.DYNAMIC_TENOR_STORE_NAME,
newColumns,
topicCalculators
));
};
}
For the tuple publisher, a second bean is created with the signature
@Qualifier(SP_QUALIFIER__SENSI_TOPIC_PUBLISHERS)
@Order(10)
public CsvColumnsProviderFunction<I> addPublisher()
defining the publishing logic:
@Bean
@Qualifier(SP_QUALIFIER__SENSI_TOPIC_PUBLISHERS)
@Order(10)
public CsvColumnsProviderFunction<I> addPublisher(IDatastore datastore) {
return (source, previousConfig) -> List.of(
new CsvTopicPublisher<>(
StoreConstants.DYNAMIC_TENOR_STORE_NAME,
new ITuplePublisher<IFileInfo<Path>>() {
@Override
public void publish(IStoreMessage<? extends IFileInfo<Path>, ?> message, List<Object[]> tuples) {
// We use the expected index for the SensitivityName field here, while in a complete implementation the index would be an instance variable.
tuples.forEach(tuple -> tuple[1] = "TestValue");
datastore.getTransactionManager().addAll(StoreConstants.DYNAMIC_TENOR_STORE_NAME, tuples);
}
@Override
public Collection<String> getTargetStores() {
return Collections.singleton(StoreConstants.DYNAMIC_TENOR_STORE_NAME);
}
}
)
);
}
Step 3 - Configuration classes
The Beans detailed above can be included in a single class within the MR application.
@Configuration
public class CustomizationsConfig {
private static final String TEST_FIELD = "TestData";
private static final String DOUBLE_NUMBER_OF_DAYS = "DoubleNumberOfDays";
@Bean
public DatastoreConfiguratorConsumer addFieldToStore() {
return configurator -> {
configurator.appendFields(SensiDatastoreDescriptionConfig.SCHEMA, StoreConstants.DYNAMIC_TENOR_STORE_NAME,
List.of(
new CustomField(DOUBLE_NUMBER_OF_DAYS, ILiteralType.DOUBLE),
new CustomField(TEST_FIELD, ILiteralType.STRING)
)
);
};
}
@Bean
@Qualifier(SP_QUALIFIER__SENSI_TOPIC_COLUMNS)
@Order(10)
public <I> CsvColumnsProviderFunction<I> addFieldAndCalculator() {
return (source, previousConfig) -> {
// We retrieve the previous columns of the topic for the Tenors store.
CsvTopicColumns prevColumns = previousConfig.get(StoreNames.TENOR_STORE_NAME);
List<String> newColumns = new ArrayList<>(prevColumns.getColumns());
newColumns.add(TEST_FIELD);
// We retrieve the previous calculators defined for the store. The Tenors store has a column calculator setting indices for each tenor.
// While in this instance the topic name and the store are the same, calculators can be defined for multiple stores within a topic.
Map<String, List<IColumnCalculator<ILineReader>>> topicCalculators = new HashMap<>(prevColumns.getCalculators());
List<IColumnCalculator<ILineReader>> storeCalculators = new ArrayList<>(topicCalculators.get(StoreNames.TENOR_STORE_NAME));
storeCalculators.add(new IColumnCalculator<>() {
@Override public String getColumnName() {
return DOUBLE_NUMBER_OF_DAYS;
}
@Override public Object compute(IColumnCalculationContext<ILineReader> context) {
return ((Double) context.getValue(StoreFieldNames.TENOR_NUMBER_OF_DAYS)) * 2.0;
}
});
topicCalculators.put(StoreNames.TENOR_STORE_NAME, storeCalculators);
// We create a new CsvTopicColumns object with the new list of columns and the previously defined column calculators.
return List.of(new CsvTopicColumns(
StoreNames.TENOR_STORE_NAME,
newColumns,
topicCalculators
));
};
}
@Bean
@Qualifier(SP_QUALIFIER__SENSI_TOPIC_PUBLISHERS)
@Order(10)
public <I> CsvPublisherProviderFunction<I> addPublisher(IDatastore datastore) {
return (source, previousConfig) -> List.of(
new CsvTopicPublisher<>(
StoreNames.TENOR_STORE_NAME,
new ITuplePublisher<IFileInfo<Path>>() {
@Override
public void publish(IStoreMessage<? extends IFileInfo<Path>, ?> message, List<Object[]> tuples) {
// We use the expected index for the SensitivityName field here, while in a complete implementation the index would be an instance variable.
tuples.forEach(tuple -> tuple[1] = "TestValue");
datastore.getTransactionManager().addAll(StoreNames.TENOR_STORE_NAME, tuples);
}
@Override
public Collection<String> getTargetStores() {
return Collections.singleton(StoreNames.TENOR_STORE_NAME);
}
}
)
);
}
}
This configuration class should then be included in the @Import
annotation of the MarketRiskConfig
class:
@Import(value = {
...
CustomizationsConfig.class
})
public class MarketRiskConfig {
...
}
Importing this customization class and modifying the input file will result in the following data being loaded into the store:
TenorLabels | SensitivityName | NumberOfDays | TenorIndices | DoubleNumberOfDays | TestData |
---|---|---|---|---|---|
N/A | TestValue | 0.0 | 0 | 0.0 | Testdata |
0.25Y | TestValue | 90.0 | 1 | 180.0 | Testdata |
0.5Y | TestValue | 180.0 | 2 | 360.0 | Testdata |
1Y | TestValue | 360.0 | 3 | 720.0 | Testdata |
2Y | TestValue | 720.0 | 4 | 1440.0 | Testdata |
3Y | TestValue | 1080.0 | 5 | 2160.0 | Testdata |
4Y | TestValue | 1440.0 | 6 | 2880.0 | Testdata |
5Y | TestValue | 1800.0 | 7 | 3600.0 | Testdata |
10Y | TestValue | 3600.0 | 8 | 7200.0 | Testdata |
15Y | TestValue | 5400.0 | 9 | 10800.0 | Testdata |
20Y | TestValue | 7200.0 | 10 | 14400.0 | Testdata |
30Y | TestValue | 10080.0 | 11 | 20160.0 | Testdata |
Helper methods
To simplify the customization process for less complex use-cases, a set of helper methods are included in the Provider
classes.
All methods are static and return an object that can be used directly in the customization Bean
.
The generated List
will contain a single parameter object containing the defined customizations.
Method | Details | Provider |
---|---|---|
addColumnsFunction(String topic, List<String> columns) |
Adds a list of columns to a topic. Preserves the previously defined calculators. | CsvColumnsProvider |
addCalculatorsFunction(String topic, List<IColumnCalculator<ILineReader>> calculators) |
Adds a list of calculators to a topic, using the topic name as the store name. Preserves the previously defined columns. | CsvColumnsProvider |
addCalculatorsFunction(String topic, Map<String, List<IColumnCalculator<ILineReader>>> calculators) |
Adds a map of calculators per store name to a topic. Preserves the previously defined columns. | CsvColumnsProvider |
addColumnsAndCalcsFunction(String topic, List<String> columns, List<IColumnCalculator<ILineReader>> calculators) |
Adds a list of columns and a list of calculators to a topic, using the topic name as the store name. | CsvColumnsProvider |
addColumnsAndCalcsFunction(String topic, List<String> columns, Map<String, List<IColumnCalculator<ILineReader>>> calculators) |
Adds a list of columns and a map of calculators per store name to a topic. | CsvColumnsProvider |
addPublisher(String topic, ITuplePublisher<IFileInfo> publisher) |
Adds a tuple publisher to a topic. | CsvPublisherProvider |
The ATableFormatTuplePublisher
For use cases where the destination table doesn’t match the input file format, we provide the ATableFormatTuplePublisher
.
Any extension of this publisher can be constructed with the file tuple containing a subset of the table fields, or with an explicit mapping between table fields and file columns.
Example
For the backwards-compatible Market Data API sources, the stores should be loaded from the deprecated file format. In the case of the SpotMarketData
, when loading from the MarketData.csv file, the following applies:
File Field | Table Field | Behavior |
---|---|---|
AsOfDate | AsOfDate | Loaded as-is |
MarketDataSet | MarketDataSet | Loaded as-is |
RiskFactorId | InstrumentId | Mapped by the publisher |
Quote | Quote | Loaded as-is |
TenorLabels | - | Must be empty |
MaturityLabels | - | Must be empty |
MoneynessLabels | - | Must be empty |
TenorDates | - | Must be empty |
MaturityDates | - | Must be empty |
Nominal | - | Must be empty |
The configuration for this behavior is:
new ATableFormatTuplePublisher<>(datastore, Map.of(INSTRUMENT_ID, RISK_FACTOR)) {
@Override
public void publish(IStoreMessage<? extends IFileInfo<Path>, ?> message, List<Object[]> tuples) {
Queue<Object[]> storeTuples = new ConcurrentLinkedQueue<>();
tuples.forEach(
tuple -> {
if (checkNull(tmmProps, message, tuple, TENOR_LABELS, TENOR_DATES, MATURITY_LABELS, MATURITY_DATES, MONEYNESS_LABELS,
NOMINAL)) {
Queue<Object[]> extractedScalarTuples = new ConcurrentLinkedQueue<>();
devectorizer.extractScalarTuples(message, tuple, extractedScalarTuples);
extractedScalarTuples.forEach(
scalarTuple -> storeTuples.add(createTupleForTable(message, SPOT_MARKET_DATA_STORE, scalarTuple))
);
}
}
);
datastore.getTransactionManager().addAll(SPOT_MARKET_DATA_STORE, storeTuples);
}
@Override
public Collection<String> getTargetStores() {
return Set.of(SPOT_MARKET_DATA_STORE);
}
}
Within the createTupleForTable
method, the publisher will iterate over the table fields, attempting to retrieve a value in the input tuple by the mapped field (or directly if no mapping is available).
If no value is available in the input tuple (or no matching field is found), the value will be empty. If a tuple field’s value isn’t requested by the publisher, it will be ignored.
Suggested Further Reading
- Adding cube hierarchies
- Configuring measures using Spring Beans
- Configuring schema selections using Spring Beans
- Adding a new KPI
- Adding data loading or unloading topics