Creating Custom Extraction Procedures
An enhancement has been made to execute some processing on large objects at extraction time rather than by the cube during query time. This is particularly important for the processing of the Vectors.
The interface IDeeTaskExtractionProcedure
defines the contract:
public interface IDeeTaskExtractionProcedure extends IExtendedPluginValue {
/**
* Get properties
*/
public Properties getProperties();
/**
* Init the procedure
*/
public void init(List<? extends IHierarchy> hierarchies) throws ActiveViamException;
/**
* Process MDX query results
*/
public List<Map<ILocation, Map<String, Object>>> process(Map<ILocation, Map<String, Object>> aggregatesMap, IActivePivotContext context);
/**
* Process Get aggregates query results
*/
public List<AggregateDTO> process(List<AggregateDTO> aggregates, IActivePivotContext context);
}
The definition introduces the tags:
extractionProcedure
, which pre-processing to apply on the data retrieve from the MDX query before generating the resultextractionProcedureValue
, tag to access data generated by the extraction procedure
JSON format
{
"name": "ExtractionProducedureTest",
"priority": "HIGH",
"queries": [
{
"@type": "MdxQuery",
"mdx": "SELECT NON EMPTY { [Measures].[BaseResult] } ON 0, NON EMPTY CrossJoin (filter(Hierarchize( [Counterparty].[CounterpartyId].[CounterpartyId].[{{CounterpartyId}}]), NOT(IsEmpty([contributors.COUNT]))), Hierarchize( [CalculationValuationDate].[CalculationValuationDate].[CalculationValuationDate].members )) ON 1 FROM [BackTestingCube] WHERE [CalculationId].[CalculationId].[CalculationId].[{{CalculationId}}]",
"context": {
"queriesTimeLimit": "600"
}
}
],
"output": {
"@type": "fileOutput",
"locale": "en",
"keepOutputFileIfFailure": false,
"filenameRoot": "ExtractionProducedureResult",
"extractionProcedure": {
"pluginKey": "PercentileEP",
"properties": {
"percentileList": "0,1,2.5,5,10,15,20,25,30,35,40,45,50,55,60,65,70,75,80,85,90,95,97.5,99,100"
}
},
"columns": [
...
{
"@type": "extractProcValueOutputColumn",
"header": "PTILE_1",
"name": "Percentile_1",
"format": "###.######"
},
{
"@type": "extractProcValueOutputColumn",
"header": "PTILE_2_5",
"name": "Percentile_2.5",
"format": "###.######"
},
...
]
}
}
Example of an extraction procedure allowing to compute a large number of percentile for a measure returning an IVector
.
@QuartetExtendedPluginValue(intf = IDeeTaskExtractionProcedure.class, key = PercentileExtractionProcedure.PLUGIN_KEY)
public class PercentileExtractionProcedure extends ADeeTaskExtractionProcedure {
private static final long serialVersionUID = -6960840774398324497L;
public static final String PLUGIN_KEY = "PercentileEP";
protected static final String AGGREGATION_RESULT_LIST = "AggregationResultList";
private static final String CALCULATION_ID_PROP = "calculationId";
private static final String CALCULATION_ID_PROP_DEFAULT_VALUE = "CalculationId";
public static final String PERCENTILE_LIST_KEY = "percentileList";
public static final String PERCENTILE_LIST_DEFAULT = "0,1,2.5,5,10,15,20,25,30,35,40,45,50,55,60,65,70,75,80,85,90,95,97.5,99,100";
public static final String PERCENTILE_RESULT_PREFIX = "Percentile_";
private static final String DELIMITER = ",";
private final Properties properties;
private final String[] percentileArray;
private ILevelInfo calculationIdLevelInfo;
public PercentileExtractionProcedure(final Properties properties) {
this.properties = properties;
percentileArray = properties.getProperty(PERCENTILE_LIST_KEY, PERCENTILE_LIST_DEFAULT).split(DELIMITER);
}
@Override
public void init(final List<? extends IHierarchy> hierarchies) throws ActiveViamException {
calculationIdLevelInfo = HierarchiesUtil
.getLevelFromProperty(hierarchies, properties.getProperty(CALCULATION_ID_PROP, CALCULATION_ID_PROP_DEFAULT_VALUE)).getLevelInfo();
}
/**
* aggregatesMap will be modified in place.
*
* @see com.hsbc.risk.swd.pivot.dee.task.exec.IDeeTaskExtractionProcedure#process(java.util.Map, com.quartetfs.biz.pivot.context.IActivePivotContext)
*/
@Override
public List<Map<ILocation, Map<String, Object>>> process(final Map<ILocation, Map<String, Object>> aggregatesMap, final IActivePivotContext context) {
final List<Map<ILocation, Map<String, Object>>> result = new ArrayList<>();
// Retrieve the calculationId/Timesteps linked to the location
final String calculationId = (String) LocationUtil.getCoordinate(aggregatesMap.keySet().iterator().next(), calculationIdLevelInfo);
for (final Map.Entry<ILocation, Map<String, Object>> aggregatesEntry : aggregatesMap.entrySet()) {
final ILocation currentLocation = aggregatesEntry.getKey();
final Map<String, Object> inputAggregates = aggregatesEntry.getValue();
if (inputAggregates == null) {
continue;
}
for (final Map.Entry<String, Object> input : inputAggregates.entrySet()) {
// assuming whenever this extraction procedure is used, all measure values are vectors
assert input.getValue() instanceof IVector;
final IVector vector = (IVector) input.getValue();
if (vector == null) {
continue;
}
final Map<String, Double> percentileValues = new LinkedHashMap<>();
for (final String percentile : percentileArray) {
percentileValues.put(PERCENTILE_RESULT_PREFIX + percentile, vector.quantileDouble(Double.parseDouble(percentile) / 100d));
}
final Map<ILocation, Map<String, Object>> locationResult = new HashMap<>();
locationResult.put(currentLocation, percentileValues);
// Add aggregates to the result
result.add(locationResult);
}
}
return result;
}
@Override
public List<AggregateDTO> process(final List<AggregateDTO> aggregates, final IActivePivotContext context) {
throw new UnsupportedOperationException(this.getClass().getSimpleName() + " does not support GetAggregatesQuery.");
}
@Override
public Properties getProperties() {
return properties;
}
@Override
public String getType() {
return PLUGIN_KEY;
}
}
In the previous example we were still generating a file, this is the default behaviour. It is possible to change this behaviour and keep the data in memory for further processing. This can be useful if we are writing some custom code and decide to do something else with the data. The API is using the following interfaces:
IMemoryOutputHandler
, interface for the output handler keeping data in memory. A defaultSimpleMemoryOutputHandler
is provided to keep the content of the row data.IMemoryDeeTaskResultInfo
, interface for a bean holding DEE memory result information. A defaultSimpleMemoryDeeTaskResultInfo
gives access to the data object.
The memory output handler can only be created by code using the InMemoryDeeOutput
class. In this case our goal is to just fill a Map
with the results and not caring about the locations.
@Override
public List<Map<ILocation, Map<String, Object>>> process(final Map<ILocation, Map<String, Object>> aggregatesMap, final IActivePivotContext context) {
final List<Map<String, Object>> aggregationResults = new ArrayList<>();
for (final Map.Entry<ILocation, Map<String, Object>> aggregatesEntry : aggregatesMap.entrySet()) {
final Map<String, Object> inputAggregates = aggregatesEntry.getValue();
if (inputAggregates == null) {
continue;
}
for (final Map.Entry<String, Object> input : inputAggregates.entrySet()) {
// assuming whenever this extraction procedure is used, all measure values are vectors
assert input.getValue() instanceof IVector;
final IVector vector = (IVector) input.getValue();
if (vector == null) {
continue;
}
final Map<String, List<Double>> percentileValues = new LinkedHashMap<>();
for (final String percentile : percentileArray) {
percentileValues.put(PERCENTILE_RESULT_PREFIX + percentile, vector.quantileDouble(Double.parseDouble(percentile) / 100d));
}
input.setValue(percentileValues);
}
aggregationResults.add(inputAggregates);
}
// Amendment only allows single aggregation results
return Collections.singletonList(Collections.singletonMap(null,
Collections.singletonMap("AggregationResultList", aggregationResults.isEmpty() ? Collections.emptyMap() : aggregationResults.get(0))));
}