AWS

AWS S3 CSV Source Configuration

AWS S3 Source allows you to load CSV Files from within an AWS S3 Bucket. The AWS S3 Source can be configured as a Fetching or Listening source.

Application properties

# General Cloud Source Properties
cloud.fetch.thread=

# AWS Source Properties
aws.bucket=
aws.root_directory=
aws.region =
aws.accessKeyId=
aws.secretKey=

If using a Listening Source, the pollingDelay property is required. This property defines how frequently we should check if a file has been modified.

pollingDelay=Miliseconds

Maven Dependency

We first must ensure we are importing the correct dependency. Your pom should have:

    <!-- For Aws CSV Source -->
<dependency>
    <groupId>com.activeviam.io</groupId>
    <artifactId>data-connectors-aws</artifactId>
    <version>${dataconnectors.version}</version>
</dependency>

Channel Configuration

For details on channel configuration, see the Channel Configuration section in the Local CSV topic.

Source Configuration

To load from Azure Blob storage, a S3CsvScopedFetchSource or S3CsvScopedListenSource source needs to be constructed. This source contains cloud specific topics of type CsvCloudScopedFetchTopic or CsvCloudScopedListenTopic. One Topic needs to be created for each fileMatcher and target datastore. A few additional beans are also created to establish connectivity with remote storage. The connectivity details for S3 bucket are provided via application properties.

Below is an example for a Fetching Source:

@Value("${aws.bucket}")
protected String awsBucket;

@Value("${aws.root_directory}")
protected String awsRootDirectory;

@Value("${aws.region}")
protected String awsRegion;

@Value("${cloud.fetch.thread:4}")
protected String awsFetchThreads;

@Autowired
private IDatastoreConfig datastoreConfig;


public ICSVSource<ICloudEntityPath<S3Object>> createAwsSource() {
    //
    // Configure AWS Source Props
    //
    final Properties sourceProps = new Properties();
    // add source properties ...
    sourceProps.put(ACsvScopedSource.PROP__SOURCE_NAME, "AWS-CSV-Source");
    ICSVSource<ICloudEntityPath<S3Object>> awsSource = new S3CsvScopedFetchSource<>(sourceProps);

    //
    // Add topics
    //
    awsSource.addTopic(
            createDirectoryTopic(
                    "Trade_Topic",
                    "/data",
                    "glob:**/*trades*.csv",
                    awsSource.createParserConfiguration(generateTradeCsvFileds(), NB_HEADER_LINES_TO_SKIP)
            )
    );

    return awsSource;
}

protected ICSVTopic<ICloudEntityPath<S3Object>> createDirectoryTopic(String topic, String subdirectory, String pathMatcherSyntaxAndPattern, ICSVParserConfiguration parserConfig) {
    // Default directory to the root S3 directory
    ICloudDirectory<S3Object> cloudDirectory = rootDirectory();

    // Check if subdirectory contains a file separator, remove if so.
    if (!subdirectory.isEmpty()) {
        if ((subdirectory.toCharArray()[0] == '\\') || (subdirectory.toCharArray()[0] == '/')) {
            subdirectory = subdirectory.substring(1);
        }
        cloudDirectory = cloudDirectory.getSubDirectory(subdirectory);
    }

    // Modify pathMatcher into a Regular Expression.
    pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("**", "*");
    pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("*", ".*");
    pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("glob:", "");
    pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("{", "(");
    pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("}", ")");
    pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace(",", "|");

    // File is named after the CobDate.
    FilesScopedFetchTopic.FetchScopeToFileScanParametersConverter converter = new FilesScopedFetchTopic.FetchScopeToFileScanParametersConverter(DataLoadControllerRestService.SCOPE_KEY__COB_DATE);

    return new CsvCloudScopedFetchTopic<>(
            topic,
            cloudDirectory,
            dataProviderFactory(),
            pathMatcherSyntaxAndPattern,
            converter,
            parserConfig,
            "",
            null);
}

@Bean
public ICloudDirectory<S3Object> rootDirectory() {
    return new S3CloudDirectory(client(), env.getRequiredProperty(awsBucket), env.getRequiredProperty(awsRootDirectory));
}

@Bean
public AmazonS3 client() {
    return AmazonS3Client.builder()
            .withCredentials(new DefaultAWSCredentialsProviderChain())
            .withRegion(Regions.fromName(env.getRequiredProperty(awsRegion)))
            .withClientConfiguration(new ClientConfiguration().withMaxConnections(128))
            .build();
}

@Bean
public ICloudCsvDataProviderFactory<S3Object> dataProviderFactory() {
    //the maximum number of threads that will download in parallel parts of an entity
    String cloudFetchThread = env.getProperty(awsFetchThreads);
    return new AwsCsvDataProviderFactory(new CloudFetchingConfig(Integer.parseInt(cloudFetchThread)));
}

Topic Configuration

The AWS Source consists of one or many Topics of type CsvCloudScopedFetchTopic or CsvCloudScopedListenTopic for Fetch or Listen Sources respectively. Fetch and Listen Topics cannot be mixed with opposite Fetch or Listen sources. The Topic name must be globally unique (to all other Topics). The CSV Cloud Topic’s consist of the following parameters:

Topic Parameter Description
name Name of this topic. Must be globally unique among all Topics
rootDirectoryPath Base directory to apply pathMatcher to within the AWS S3 Bucket
AwsCsvDataProviderFactory An instance of AwsCsvDataProviderFactory
regex Regex pattern to be used to match files. Similar to a PathMatcher.
fetchScopeToFileScanParametersConverter Function to use Scope keys to an IFileScanParameters to scan files to load. Can use provided CsvFilesScopedFetchTopic.ScopeToFileScanParametersConverter
csvParserConfiguration ICSVParserConfiguration for CSV file Column Names, number of lines to skip, etc. Can use S3CsvScopedFetchSource.createParserConfiguration(...)
fileTaskPluginKey Optional Plugin Key of IFileTask to handle processing of CSV file.
extraProperties Can be null, no required additional Properties
Topic Parameter Description
topic Name of this topic. Must be globally unique among all Topics
pathMatcher String regex to match file paths for this topic
AwsCsvDataProviderFactory An instance of AwsCsvDataProviderFactory
rootDirectoryPath Base directory to apply pathMatcher to within the AWS S3 Bucket
scopeToFileScanParametersConverter Function to use Scope keys to an IFileScanParameters to scan files to load. Can use provided CsvFilesScopedListenTopic.ScopeToFileScanParametersConverter
csvParserConfiguration ICSVParserConfiguration for CSV file Column Names, number of lines to skip, etc. Can use S3CsvScopedListenSource.createParserConfiguration(...)
extraProperties Can contain optional property “pollingDelay” describing millisecond pause between checks for CSV file changes. Default is 500

Java Topic properties:

public CsvCloudScopedFetchTopic(
    String name,
    ICloudDirectory<E> rootDirectoryPath,
    ICloudCsvDataProviderFactory<E> providerFactory,
    String regex,
    IFilesScopedFetchTopic.IScopeToFileScanParametersConverter fetchScopeToFileScanParametersConverter,
    ICSVParserConfiguration csvParserConfiguration,
    String fileTaskPluginKey,
    Properties extraProperties)
public CsvCloudScopedListenTopic(
    String topic,
    String pathMatcher,
    ICloudCsvDataProviderFactory<E> providerFactory,
    ICloudDirectory<E> rootDirectoryPath,
    IFilesScopedTopic.IScopeToFileScanParametersConverter scopeToFileScanParametersConverter,
    ICSVParserConfiguration csvParserConfiguration,
    Properties extraProperties)
search.js