Creating Custom Extraction Procedures

An enhancement has been made to execute some processing on large objects at extraction time rather than by the cube during query time. This is particularly important for the processing of the Vectors.

The interface IDeeTaskExtractionProcedure defines the contract:

public interface IDeeTaskExtractionProcedure extends IExtendedPluginValue {

	/**
	 * Get properties
	 */
	public Properties getProperties();

	/**
	 * Init the procedure
	 */
	public void init(List<? extends IHierarchy> hierarchies) throws ActiveViamException;

	/**
	 * Process MDX query results
	 */
	public List<Map<ILocation, Map<String, Object>>> process(Map<ILocation, Map<String, Object>> aggregatesMap, IActivePivotContext context);

	/**
	 * Process Get aggregates query results
	 */
	public List<AggregateDTO> process(List<AggregateDTO> aggregates, IActivePivotContext context);

}

The definition introduces the tags:

  1. extractionProcedure, which pre-processing to apply on the data retrieve from the MDX query before generating the result
  2. extractionProcedureValue, tag to access data generated by the extraction procedure

JSON format

{
  "name": "ExtractionProducedureTest",
  "priority": "HIGH",
  "queries": [
    {
      "@type": "MdxQuery",
      "mdx": "SELECT NON EMPTY { [Measures].[BaseResult] } ON 0, NON EMPTY CrossJoin (filter(Hierarchize( [Counterparty].[CounterpartyId].[CounterpartyId].[{{CounterpartyId}}]), NOT(IsEmpty([contributors.COUNT]))), Hierarchize( [CalculationValuationDate].[CalculationValuationDate].[CalculationValuationDate].members )) ON 1 FROM [BackTestingCube] WHERE [CalculationId].[CalculationId].[CalculationId].[{{CalculationId}}]",
      "context": {
        "queriesTimeLimit": "600"
      }
    }
  ],
  "output": {
    "@type": "fileOutput",
    "locale": "en",
    "keepOutputFileIfFailure": false,
    "filenameRoot": "ExtractionProducedureResult",
    "extractionProcedure": {
      "pluginKey": "PercentileEP",
      "properties": {
        "percentileList": "0,1,2.5,5,10,15,20,25,30,35,40,45,50,55,60,65,70,75,80,85,90,95,97.5,99,100"
      }
    },
    "columns": [
      ...
      {
        "@type": "extractProcValueOutputColumn",
        "header": "PTILE_1",
        "name": "Percentile_1",
        "format": "###.######"
      },
      {
        "@type": "extractProcValueOutputColumn",
        "header": "PTILE_2_5",
        "name": "Percentile_2.5",
        "format": "###.######"
      },
      ...
    ]
  }
}

Example of an extraction procedure allowing to compute a large number of percentile for a measure returning an IVector.

@QuartetExtendedPluginValue(intf = IDeeTaskExtractionProcedure.class, key = PercentileExtractionProcedure.PLUGIN_KEY)
public class PercentileExtractionProcedure extends ADeeTaskExtractionProcedure {
	private static final long serialVersionUID = -6960840774398324497L;
	public static final String PLUGIN_KEY = "PercentileEP";

	protected static final String AGGREGATION_RESULT_LIST = "AggregationResultList";
	private static final String CALCULATION_ID_PROP = "calculationId";
	private static final String CALCULATION_ID_PROP_DEFAULT_VALUE = "CalculationId";
	public static final String PERCENTILE_LIST_KEY = "percentileList";
	public static final String PERCENTILE_LIST_DEFAULT = "0,1,2.5,5,10,15,20,25,30,35,40,45,50,55,60,65,70,75,80,85,90,95,97.5,99,100";
	public static final String PERCENTILE_RESULT_PREFIX = "Percentile_";
	private static final String DELIMITER = ",";

	private final Properties properties;
	private final String[] percentileArray;
	private ILevelInfo calculationIdLevelInfo;

	public PercentileExtractionProcedure(final Properties properties) {
		this.properties = properties;
		percentileArray = properties.getProperty(PERCENTILE_LIST_KEY, PERCENTILE_LIST_DEFAULT).split(DELIMITER);
	}

	@Override
	public void init(final List<? extends IHierarchy> hierarchies) throws ActiveViamException {
		calculationIdLevelInfo = HierarchiesUtil
				.getLevelFromProperty(hierarchies, properties.getProperty(CALCULATION_ID_PROP, CALCULATION_ID_PROP_DEFAULT_VALUE)).getLevelInfo();
	}

	/**
	 * aggregatesMap will be modified in place.
	 *
	 * @see com.hsbc.risk.swd.pivot.dee.task.exec.IDeeTaskExtractionProcedure#process(java.util.Map, com.quartetfs.biz.pivot.context.IActivePivotContext)
	 */
	@Override
	public List<Map<ILocation, Map<String, Object>>> process(final Map<ILocation, Map<String, Object>> aggregatesMap, final IActivePivotContext context) {
		final List<Map<ILocation, Map<String, Object>>> result = new ArrayList<>();
		// Retrieve the calculationId/Timesteps linked to the location
		final String calculationId = (String) LocationUtil.getCoordinate(aggregatesMap.keySet().iterator().next(), calculationIdLevelInfo);

		for (final Map.Entry<ILocation, Map<String, Object>> aggregatesEntry : aggregatesMap.entrySet()) {
			final ILocation currentLocation = aggregatesEntry.getKey();
			final Map<String, Object> inputAggregates = aggregatesEntry.getValue();
			if (inputAggregates == null) {
				continue;
			}

			for (final Map.Entry<String, Object> input : inputAggregates.entrySet()) {
				// assuming whenever this extraction procedure is used, all measure values are vectors
				assert input.getValue() instanceof IVector;

				final IVector vector = (IVector) input.getValue();
				if (vector == null) {
					continue;
				}

				final Map<String, Double> percentileValues = new LinkedHashMap<>();
				for (final String percentile : percentileArray) {
					percentileValues.put(PERCENTILE_RESULT_PREFIX + percentile, vector.quantileDouble(Double.parseDouble(percentile) / 100d));
				}

				final Map<ILocation, Map<String, Object>> locationResult = new HashMap<>();
				locationResult.put(currentLocation, percentileValues);
				// Add aggregates to the result
				result.add(locationResult);
			}
		}
		return result;
	}

	@Override
	public List<AggregateDTO> process(final List<AggregateDTO> aggregates, final IActivePivotContext context) {
		throw new UnsupportedOperationException(this.getClass().getSimpleName() + " does not support GetAggregatesQuery.");
	}

	@Override
	public Properties getProperties() {
		return properties;
	}

	@Override
	public String getType() {
		return PLUGIN_KEY;
	}

}

In the previous example we were still generating a file, this is the default behaviour. It is possible to change this behaviour and keep the data in memory for further processing. This can be useful if we are writing some custom code and decide to do something else with the data. The API is using the following interfaces:

  • IMemoryOutputHandler, interface for the output handler keeping data in memory. A default SimpleMemoryOutputHandler is provided to keep the content of the row data.
  • IMemoryDeeTaskResultInfo, interface for a bean holding DEE memory result information. A default SimpleMemoryDeeTaskResultInfo gives access to the data object.

The memory output handler can only be created by code using the InMemoryDeeOutput class. In this case our goal is to just fill a Map with the results and not caring about the locations.

@Override
public List<Map<ILocation, Map<String, Object>>> process(final Map<ILocation, Map<String, Object>> aggregatesMap, final IActivePivotContext context) {
	final List<Map<String, Object>> aggregationResults = new ArrayList<>();
	for (final Map.Entry<ILocation, Map<String, Object>> aggregatesEntry : aggregatesMap.entrySet()) {
		final Map<String, Object> inputAggregates = aggregatesEntry.getValue();
		if (inputAggregates == null) {
			continue;
		}
		for (final Map.Entry<String, Object> input : inputAggregates.entrySet()) {
			// assuming whenever this extraction procedure is used, all measure values are vectors
			assert input.getValue() instanceof IVector;

			final IVector vector = (IVector) input.getValue();
			if (vector == null) {
				continue;
			}
			final Map<String, List<Double>> percentileValues = new LinkedHashMap<>();
			for (final String percentile : percentileArray) {
				percentileValues.put(PERCENTILE_RESULT_PREFIX + percentile, vector.quantileDouble(Double.parseDouble(percentile) / 100d));
			}
			input.setValue(percentileValues);
		}

		aggregationResults.add(inputAggregates);
	}
	// Amendment only allows single aggregation results
	return Collections.singletonList(Collections.singletonMap(null,
			Collections.singletonMap("AggregationResultList", aggregationResults.isEmpty() ? Collections.emptyMap() : aggregationResults.get(0))));
}