DLC Real-Time Data Sources

CSV Source

CSV files can be loaded in real-time by implementing the ACsvScopedListenSource class and its startListening() and stopListening() methods.

Currently supported Listening sources are as follows:

CSV File Source Implementation Class Data-Connectors Module
On Local Filesystem LocalCsvScopedListenSource data-connectors-csv
On Azure AzureCsvScopedListenSource data-connectors-azure
On Google Cloud Platform GcpCsvScopedListenSource data-connectors-gcp
On AWS S3 Bucket S3CsvScopedListenSource data-connectors-aws

  • A CSV Listen topic CANNOT have the same name as a CSV Fetch topic. An ActiveViamRuntime Exception will be thrown.
  • Deletion of a listed file will not remove that file’s data from the cube. This is a feature that is not supported.

Additional properties:

The CSV Listen will poll from the target file in increments set by the PROP_POLLING_DELAY property of the FileScopedListenTopic. By default, the CSV listeners will poll every 100ms. This can be overridden by setting the PROP_POLLING_DELAY of the Properties sent to the FileScopedListenTopic.

Load real-time from local filesystem

We first must ensure we are importing the correct dependency. Your pom should have:

    <!-- Local CSV Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>data-connectors-csv</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

To establish real time data-loading from local filesystem, a FileScopedListenTopic needs to be created.
Example:

  
    @Bean
	public LocalCsvScopedListenSource(Properties props) {

	final Properties sourceProps = new Properties();
        sourceProps.setProperty(ACsvScopedListenSource.PROP__SOURCE_NAME, "LocalCSVScopedListenSource");
        
		Properties topicProperties = new Properties();
		topicProperties.put(CSVFileListener.PROP__SOURCE_NAME, new LocalCsvScopedListenSource(sourceProps));
		topicProperties.put(FileScopedListenTopic.PROP_POLLING_DELAY, pollingDelayValue); // Set via Application Properties, see section below
		
        List<String> scopeKeys = new ArrayList<>();
        scopeKeys.add(DataLoadControllerRestService.SCOPE_KEY__COB_DATE);

        final LocalCsvScopedListenSource<Path> localCsvScopedListenSource = new LocalCsvScopedListenSource (sourceProps);

		localCsvScopedListenSource.addTopic(
			new FileScopedListenTopic(
					CSV_TOPIC__CASHFLOW,
					localCsvScopedListenSource.createParserConfiguration(CASHFLOW_FILE_COLUMNS, linesToSkip),
					csvDataRootDirPath,
					new WatcherService(),
					pathMatcherSyntaxAndPattern == null ? null
							: FileSystems.getDefault().getPathMatcher(pathMatcherSyntaxAndPattern),
							new FileScopedListenTopic.FetchScopeToFileScanParametersConverter(
									DataLoadControllerRestService.SCOPE_KEY__COB_DATE),
							topicProperties);

            )
        );
}

Load real-time from cloud

We first must ensure we are importing the correct dependency. Your pom should have one or many of the following:

    <!-- For Aws CSV Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>data-connectors-aws</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

    <!-- For Azure CSV Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>data-connectors-azure</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

    <!-- For Google-Cloud-Platform CSV Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>data-connectors-gcp</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

To load from a cloud storage, the CSV source and cloud specific CSV topic can be configured as below -

    @Bean
    public S3CsvScopedListenSource s3CsvScopedListenSource() {
        final Properties sourceProps = new Properties();
        sourceProps.setProperty(ACsvScopedFetchSource.PROP__SOURCE_NAME, "S3CsvScopedListenSource");

        List<String> scopeKeys = new ArrayList<>();
        scopeKeys.add(DataLoadControllerRestService.SCOPE_KEY__COB_DATE);

        final S3CsvScopedListenSource s3CsvScopedListenSource = new S3CsvScopedListenSource(sourceProps);

        s3CsvScopedListenSource.addTopic(                
            createListenDirectoryTopic(
                    source, CSV_TOPIC__TRADE, inputDataRootDirPath , DELIV_SUBDIR,
                    INPUT_FILE_PATH_MATCHER_TRADE, source.createParserConfiguration(generateTradeCsvFileds(), NB_HEADER_LINES_TO_SKIP)
            )
        );

        return s3CsvScopedListenSource;
    }

    @Override
    protected ICSVTopic<ICloudEntityPath<S3Object>> createListenDirectoryTopic(ICSVSource<ICloudEntityPath<S3Object>> source, String topic,
			String inputDataRootDirPath, String subdirectory, String pathMatcherSyntaxAndPattern, ICSVParserConfiguration createParserConfiguration) {
		ICloudDirectory<S3Object> cloudDirectory = rootDirectory();
        if (!subdirectory.isEmpty()) {
            // Check if subdirectory contains a file separator, remove if so.
            if(subdirectory.toCharArray()[0] == '\\' || subdirectory.toCharArray()[0] == '/'){
                subdirectory = subdirectory.substring(1);
            }
            cloudDirectory = cloudDirectory.getSubDirectory(subdirectory);
        }
        pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("**", "*");
        pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("*", ".*");
        pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("glob:", "");
        pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("{", "(");
        pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("}", ")");
        pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace(",", "|");
        
        Properties topicProps = new Properties();
		topicProps.put("source-name", source);
		topicProps.put("polling.delay", 100);
				
		return new CsvCloudScopedListenTopic(
				topic,
				createParserConfiguration,
				pathMatcherSyntaxAndPattern,
				dataProviderFactory(),
				cloudDirectory,
				new FilesScopedListenTopic.FetchScopeToFileScanParametersConverter(
                        DataLoadControllerRestService.SCOPE_KEY__COB_DATE),
				topicProps);
    }

AWS S3

Dependency to include:

    <!-- For Aws CSV Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>data-connectors-aws</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

To load from S3 storage, a cloud specific topic CsvCloudScopedListenTopic needs to be created as explained above. A few additional beans are also created to establish connectivity with remote storage. Below is the example. The connectivity details for S3 bucket are provided via application properties.

    @Bean
    public ICloudDirectory<S3Object> rootDirectory() {
        return new S3CloudDirectory(client(), env.getRequiredProperty(ACsvSourceConfig.AWS_BUCKET), env.getRequiredProperty(ACsvSourceConfig.AWS_ROOT_DIRECTORY));

    }

    @Bean
    public AmazonS3 client() {
        return AmazonS3Client.builder()
                .withCredentials(new DefaultAWSCredentialsProviderChain())
                .withRegion(Regions.fromName(env.getRequiredProperty(ACsvSourceConfig.AWS_REGION)))
                .withClientConfiguration(new ClientConfiguration().withMaxConnections(128))
                .build();
    }

    @Bean
    public ICloudCsvDataProviderFactory<S3Object> dataProviderFactory() {
        //the maximum number of threads that will download in parallel parts of an entity
        String cloudFetchThread = env.getProperty(ACsvSourceConfig.ACTIVE_CLOUD_FETCH_THREAD, "10");
        return new AwsCsvDataProviderFactory(new CloudFetchingConfig(Integer.parseInt(cloudFetchThread)));
    }

Azure Blob

Dependency to include:

    <!-- For Azure CSV Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>azure</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

Here we create a CsvCloudScopedListenTopic similar to S3. Below are the necessary methods to connect to an Azure Blob.

  
    @Bean
    public ICloudDirectory<CloudBlob> rootDirectory() {
        return new AzureCloudDirectory(client(), env.getRequiredProperty(AZURE_CONTAINER_NAME),
                env.getRequiredProperty(AZURE_ROOT_DIRECTORY));
    }

    @Bean
    /*Expect Azure connection string in the VM parameter
     * -Dazure.connection.string
     */
    public CloudBlobClient client() {
        String storageConnectionString = System.getProperty(AZURE_STORGE_CONNECTION_STRING);
        CloudStorageAccount storageAccount;
        CloudBlobClient blobClient;
        CloudBlobContainer container = null;

        // Parse the connection string and create a blob client to interact with Blob storage                                                             
        try {
            storageAccount = CloudStorageAccount.parse(storageConnectionString);

            blobClient = storageAccount.createCloudBlobClient();
            container = blobClient.getContainerReference(env.getRequiredProperty(AZURE_CONTAINER_NAME));

        } catch (InvalidKeyException | URISyntaxException | StorageException e) {
            e.printStackTrace();
            throw new UnsupportedOperationException("Azure Cloud Blob Client Issues");
        }
        return blobClient;
    }

    @Bean
    public ICloudCsvDataProviderFactory<CloudBlob> dataProviderFactory() {
        //the maximum number of threads that will download in parallel parts of an entity
        String cloudFetchThread = env.getProperty(ACTIVE_CLOUD_FETCH_THREAD, ACTIVE_CLOUD_FETCH_THREAD);
        return new AzureCsvDataProviderFactory(new CloudFetchingConfig(Integer.parseInt(cloudFetchThread) ));
    }

Google Storage

Dependency to include:

    <!-- For Google-Cloud-Platform CSV Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>data-connectors-gcp</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

Similar to S3 and Azure, we create a CsvCloudScopedListenTopic. The cloud directory and client are configured as below -

    @Bean
    public ICloudDirectory<Blob> rootDirectory() {
        Map<String, String> temp = System.getenv();
        return new GoogleCloudDirectory(client(), env.getRequiredProperty(ACsvSourceConfig.GOOGLE_CONTAINER_NAME),
                env.getRequiredProperty(ACsvSourceConfig.GOOGLE_ROOT_DIRECTORY));
    }

    @Bean
    public Storage client() {
        return StorageOptions.getDefaultInstance().getService();
    }

    @Bean
    public ICloudCsvDataProviderFactory<Blob> dataProviderFactory() {
        //the maximum number of threads that will download in parallel parts of an entity
        String cloudFetchThread = env.getProperty(ACsvSourceConfig.ACTIVE_CLOUD_FETCH_THREAD, ACsvSourceConfig.ACTIVE_CLOUD_FETCH_THREAD);

        return new GoogleCsvDataProviderFactory(new CloudFetchingConfig(Integer.parseInt(cloudFetchThread) ));
    }

Application properties

Below is an example of the application properties configured to load from cloud storage options mentioned above. Please provide the values obtained from your account.

# General Cloud Source Properties
cloud.fetch.thread=

# AWS Source Properties
aws.bucket=
aws.root_directory=
aws.region =
aws.accessKeyId=
aws.secretKey=

# Azure Source Properites
azure.container=
azure.root_directory=
azure.connection.string=

# Google Cloud Platform Source Properties
google.container=
google.root_directory=
local.root_directory=
search.js