Configuring the DLC
Any of the source dependencies must be included in your projects pom. You can check Data Connectors Requirements to ensure your project meets the minimum ActivePivot version requirements. You must also include one dependency for each source you wish to add. All supported sources can be found at Data Connectors Module Structure.
Here is an example of importing Data Connectors for the CSV and JDBC sources:
<!-- Data Connectors CSV Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-csv</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
<!-- Data Connectors JDBC Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-jdbc</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
Please contact your project team if you do not have the Data Connectors jar file. It needs to be imported in your maven repository.
Next, once the required spring configurations for data source type (CSV, JDBC or Kafka/RabbitMQ)
have been configured (examples further below), they need to be registered with the DataLoadController
.
The following snippets show you how to do that:
Required APM Configuration
The DLC relies on APM’s IHealthEventDispatcher
to dispatch HealthEvents as well as Trace the HealthEvents by using a TraceContext. To ensure APM and its Tracing components are configured correctly the following configurations will need to be implemented into your project.
The following configurations are required to ensure proper setup / function of APM.
APMTracingConfig
must be included in your application configuration. Please see code sample below:
@Import({
// Required Configuration for Tracing
APMTracingConfig.class
})
public class ApplicationConfig {
}
- APM must be defined with high precedence in ActivePivot’s
Registry
. This is because some classes are Extended Plugins or Types overriding the core code. It is important to make sure they are picked first by the Registry. Make sure you add the packagecom.activeviam.apm
to be picked first. The below snipit can be added to your applications configuration.
/** Before anything else we statically initialize the ActiveViam Registry. */
static {
Registry.setContributionProvider(
new ClasspathContributionProvider(
"com.activeviam.apm", // First has highest priority
"com.qfs",
"com.quartetfs",
"com.activeviam"
)
);
}
- If you are executing any DLC tasks via Spring Beans during initialization of your application, you must ensure that the
APM_TRACING
Bean is created first. TheAPM_TRACING
bean is required to be created before any data loading. The bean is created inside ofAPMTracingConfig
. If this bean has not been created before data loading than the Tracing provided by APM will not work and this can result in null Tracing values. You can ensure theAPM_TRACING
bean is created first by either the@DependsOn
annotation or by Autowiring theAPM_TRACING
Bean. Please see code sample below as an example:
@Bean
@DependsOn(value = {APM_TRACING})
public Void initialLoad() {
// Load data here...
}
Create DataLoadController
Bean
@Bean(destroyMethod = "close")
public IDataLoadController dataLoadController() {
final IDataLoadController controller = new DataLoadController(datastoreConfig.datastore(), null);
registerJdbcSourceAndChannels(controller);
return controller;
}
Register data source and channels
private void registerJdbcSourceAndChannels(
final IDataLoadController controller) {
final TupleMessageChannelFactory channelFactory = jdbcSourceConfig.jdbcMessageChannelFactory();
controller.registerSource(jdbcSourceConfig.jdbcSource());
controller.registerChannel(
channelFactory.createChannel(
JdbcSourceConfig.JDBC_TOPIC__PRODUCT,
DatastoreNames.PRODUCT_STORE_NAME),
Arrays.asList(DatastoreNames.PRODUCT_STORE_NAME),
new ProductScopeToRemoveWhereConditionConverter());
}
Remove-where condition for unload
public class ProductScopeToRemoveWhereConditionConverter
implements IDataLoadController.IScopeToRemoveWhereConditionConverter {
@Override
public ICondition apply(
final String store,
final Map<String, Object> scope){
final LocalDate cobDate = LocalDate
.parse((String) scope.get(DataLoadControllerRestService.SCOPE_KEY__COB_DATE));
ICondition cobCond = BaseConditions.Equal(COB_DATE, cobDate);
ICondition productCond = BaseConditions.Not(BaseConditions.Equal(BASE_STORE_PRODUCT_NAME, "N/A"));
return BaseConditions.And(cobCond, productCond);
}
}
Register topic alias
You may group several topics together under an alias, so that the client / data orchestrator can request to load a single topic alias instead of providing each and every topic name to load in the request. This is useful when submitting a single request to load all the topics.
For example, you may configure an alias “ALL” that groups all the topics together.
protected void registerTopicAliases(IDataLoadController dataLoadController) {
dataLoadController.registerTopicAlias(TOPIC__ALL, Arrays.asList(TOPIC__FX_DATA, TOPIC__ORGANISATION_DATA, TOPIC__TRADE_BOOKING_DATA, TOPIC__TRADES));
}