Azure
Azure Blob Source Configuration
Azure Source allows you to load CSV Files from within an Azure Blob Container. The Azure Blob Source can be configured as a Fetching or Listening source.
Application properties
# General Cloud Source Properties
cloud.fetch.thread=
# Azure Source Properites
azure.container=
azure.root_directory=
azure.connection.string=
If using a Listening Source, the pollingDelay
property is required. This property defines how frequently we should check if a file has been modified.
pollingDelay=Miliseconds
Maven Dependency
We first must ensure we are importing the correct dependency. Your pom should have:
<!-- Azure Blob Source -->
<dependency>
<groupId>com.activeviam.io</groupId>a
<artifactId>data-connectors-azure</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
Channel Configuration
Please read this section on Channel configuration.
Source Configuration
To load from Azure Blob storage, an AzureCsvScopedFetchSource
or AzureCsvScopedListenSource
source needs to be constructed. This source contains cloud specific topics of type CsvCloudScopedListenTopic
. One Topic needs to be created for each fileMatcher and target datastore. A few additional beans are also created to establish connectivity with remote storage. The connectivity details for Azure Container are provided via application properties.
Below is an example for a Fetching Source:
@Value("${azure.connection.string}")
protected String azureConnectionString;
@Value("${azure.container}")
protected String azureContainer;
@Value("${azure.root_directory}")
protected String azureRootDirectory;
@Value("${cloud.fetch.thread:4}")
protected String azureFetchThreads;
public ICSVSource<ICloudEntityPath<BlobClientBase>> createAzureSource() {
//
// Configure Azure Source Props
//
final var sourceName = "Azure CSV Source";
var csvSourceConfig = new DlcCSVSourceConfiguration.DlcCSVSourceConfigurationBuilder()
.sourceName(sourceName)
.build();
ICSVSource<ICloudEntityPath<BlobClientBase>> azureSource = new AzureCsvScopedFetchSource<>(csvSourceConfig);
//
// Add topics
//
azureSource.addTopic(
createDirectoryTopic(
"Trade_Topic",
"/data",
"glob:**/*trades*.csv",
azureSource.createParserConfiguration(generateTradeCsvFileds(), NB_HEADER_LINES_TO_SKIP)
)
);
return azureSource;
}
protected ICSVTopic<ICloudEntityPath<BlobClientBase>> createDirectoryTopic(String topic, String subdirectory, String pathMatcherSyntaxAndPattern, ICSVParserConfiguration parserConfig) {
// Default directory to the root Azure directory
ICloudDirectory<BlobClientBase> cloudDirectory = rootDirectory();
// Check if subdirectory contains a file separator, remove if so.
if (!subdirectory.isEmpty()) {
if ((subdirectory.toCharArray()[0] == '\\') || (subdirectory.toCharArray()[0] == '/')) {
subdirectory = subdirectory.substring(1);
}
cloudDirectory = cloudDirectory.getSubDirectory(subdirectory);
}
// Modify pathMatcher into a Regular Expression.
pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("**", "*");
pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("*", ".*");
pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("glob:", "");
pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("{", "(");
pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("}", ")");
pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace(",", "|");
// File is named after the CobDate.
FilesScopedFetchTopic.FetchScopeToFileScanParametersConverter converter = new FilesScopedFetchTopic.FetchScopeToFileScanParametersConverter(DataLoadControllerRestService.SCOPE_KEY__COB_DATE);
return new CsvCloudScopedFetchTopic<>(
topic,
cloudDirectory,
dataProviderFactory(),
pathMatcherSyntaxAndPattern,
converter,
parserConfig,
"",
null);
}
@Bean
public ICloudDirectory<BlobClientBase> rootDirectory() {
return new AzureCloudDirectory(client(), env.getRequiredProperty(azureContainer),
env.getRequiredProperty(azureRootDirectory));
}
@Bean
public BlobServiceClient client() {
String storageConnectionString = System.getProperty(azureConnectionString);
return new BlobServiceClientBuilder()
.connectionString(storageConnectionString)
.buildClient();
}
@Bean
public ICloudCsvDataProviderFactory<BlobClientBase> dataProviderFactory() {
//the maximum number of threads that will download in parallel parts of an entity
String cloudFetchThread = env.getProperty(azureFetchThreads);
return new AzureCsvDataProviderFactory(new CloudFetchingConfig(Integer.parseInt(cloudFetchThread)));
}
Topic Configuration
The Azure Source consists of one or many Topics of type CsvCloudScopedFetchTopic
or CsvCloudScopedListenTopic
for Fetch or Listen Sources respectively. Fetch and Listen Topics cannot be mixed with opposite Fetch or Listen sources. The Topic name must be globally unique (to all other Topics). The CSV Cloud Topic’s consist of the following parameters:
Topic Parameter | Description |
---|---|
name | Name of this topic. Must be globally unique among all Topics |
rootDirectoryPath | Base directory to apply pathMatcher to within the Azure Blob Container |
AzureCsvDataProviderFactory | An instance of AzureCsvDataProviderFactory |
regex | Regex pattern to be used to match files. Similar to a PathMatcher. |
fetchScopeToFileScanParametersConverter | Function to use Scope keys to an IFileScanParameters to scan files to load. Can use provided CsvFilesScopedFetchTopic.ScopeToFileScanParametersConverter |
csvParserConfiguration | ICSVParserConfiguration for CSV file Column Names, number of lines to skip, etc. Can use AzureCsvScopedFetchSource.createParserConfiguration(...) |
fileTaskPluginKey | Optional Plugin Key of IFileTask to handle processing of CSV file. |
extraProperties | Can be null, no required additional Properties |
Topic Parameter | Description |
---|---|
topic | Name of this topic. Must be globally unique among all Topics |
pathMatcher | String regex to match file paths for this topic |
AzureCsvDataProviderFactory | An instance of AzureCsvDataProviderFactory |
rootDirectoryPath | Base directory to apply pathMatcher to within the Azure Blob Container |
scopeToFileScanParametersConverter | Function to use Scope keys to an IFileScanParameters to scan files to load. Can use provided CsvFilesScopedListenTopic.ScopeToFileScanParametersConverter |
csvParserConfiguration | ICSVParserConfiguration for CSV file Column Names, number of lines to skip, etc. Can use AzureCsvScopedListenSource.createParserConfiguration(...) |
extraProperties | Can contain optional property “pollingDelay” describing millisecond pause between checks for CSV file changes. Default is 500 |
Java Topic properties:
public CsvCloudScopedFetchTopic(
String name,
ICloudDirectory<E> rootDirectoryPath,
ICloudCsvDataProviderFactory<E> providerFactory,
String regex,
IFilesScopedFetchTopic.IScopeToFileScanParametersConverter fetchScopeToFileScanParametersConverter,
ICSVParserConfiguration csvParserConfiguration,
String fileTaskPluginKey,
Properties extraProperties)
public CsvCloudScopedListenTopic(
String topic,
String pathMatcher,
ICloudCsvDataProviderFactory<E> providerFactory,
ICloudDirectory<E> rootDirectoryPath,
IFilesScopedTopic.IScopeToFileScanParametersConverter scopeToFileScanParametersConverter,
ICSVParserConfiguration csvParserConfiguration,
Properties extraProperties)