DLC Real-Time Data Sources

CSV Source

CSV files can be loaded in real-time by implementing the ACsvScopedListenSource class and its startListening() and stopListening() methods.

Currently supported Listening sources are as follows:

CSV File Source Implementation Class Data-Connectors Module
On Local Filesystem LocalCsvScopedListenSource data-connectors-csv
On Azure AzureCsvScopedListenSource data-connectors-azure
On Google Cloud Platform GcpCsvScopedListenSource data-connectors-gcp
On AWS S3 Bucket S3CsvScopedListenSource data-connectors-aws

  • A CSV Listen topic CANNOT have the same name as a CSV Fetch topic. An ActiveViamRuntime Exception will be thrown.
  • Deletion of a listed file will not remove that file’s data from the cube. This is a feature that is not supported.

Additional properties:

The CSV Listen will poll from the target file in increments set by the PROP_POLLING_DELAY property of the FileScopedListenTopic. By default, the CSV listeners will poll every 100ms. This can be overridden by setting the PROP_POLLING_DELAY of the Properties sent to the FileScopedListenTopic.

Load real-time from local filesystem

We first must ensure we are importing the correct dependency. Your pom should have:

    <!-- Local CSV Source -->

To establish real time data-loading from local filesystem, a FileScopedListenTopic needs to be created.

	public LocalCsvScopedListenSource(Properties props) {

	final Properties sourceProps = new Properties();
        sourceProps.setProperty(ACsvScopedListenSource.PROP__SOURCE_NAME, "LocalCSVScopedListenSource");
		Properties topicProperties = new Properties();
		topicProperties.put(CSVFileListener.PROP__SOURCE_NAME, new LocalCsvScopedListenSource(sourceProps));
		topicProperties.put(FileScopedListenTopic.PROP_POLLING_DELAY, pollingDelayValue); // Set via Application Properties, see section below
        List<String> scopeKeys = new ArrayList<>();

        final LocalCsvScopedListenSource<Path> localCsvScopedListenSource = new LocalCsvScopedListenSource (sourceProps);

			new FileScopedListenTopic(
					localCsvScopedListenSource.createParserConfiguration(CASHFLOW_FILE_COLUMNS, linesToSkip),
					new WatcherService(),
					pathMatcherSyntaxAndPattern == null ? null
							: FileSystems.getDefault().getPathMatcher(pathMatcherSyntaxAndPattern),
							new FileScopedListenTopic.FetchScopeToFileScanParametersConverter(


Load real-time from cloud

We first must ensure we are importing the correct dependency. Your pom should have one or many of the following:

    <!-- For Aws CSV Source -->

    <!-- For Azure CSV Source -->

    <!-- For Google-Cloud-Platform CSV Source -->

To load from a cloud storage, the CSV source and cloud specific CSV topic can be configured as below -

    public S3CsvScopedListenSource s3CsvScopedListenSource() {
        final Properties sourceProps = new Properties();
        sourceProps.setProperty(ACsvScopedFetchSource.PROP__SOURCE_NAME, "S3CsvScopedListenSource");

        List<String> scopeKeys = new ArrayList<>();

        final S3CsvScopedListenSource s3CsvScopedListenSource = new S3CsvScopedListenSource(sourceProps);

                    source, CSV_TOPIC__TRADE, inputDataRootDirPath , DELIV_SUBDIR,
                    INPUT_FILE_PATH_MATCHER_TRADE, source.createParserConfiguration(generateTradeCsvFileds(), NB_HEADER_LINES_TO_SKIP)

        return s3CsvScopedListenSource;

    protected ICSVTopic<ICloudEntityPath<S3Object>> createListenDirectoryTopic(ICSVSource<ICloudEntityPath<S3Object>> source, String topic,
			String inputDataRootDirPath, String subdirectory, String pathMatcherSyntaxAndPattern, ICSVParserConfiguration createParserConfiguration) {
		ICloudDirectory<S3Object> cloudDirectory = rootDirectory();
        if (!subdirectory.isEmpty()) {
            // Check if subdirectory contains a file separator, remove if so.
            if(subdirectory.toCharArray()[0] == '\\' || subdirectory.toCharArray()[0] == '/'){
                subdirectory = subdirectory.substring(1);
            cloudDirectory = cloudDirectory.getSubDirectory(subdirectory);
        pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("**", "*");
        pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("*", ".*");
        pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("glob:", "");
        pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("{", "(");
        pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("}", ")");
        pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace(",", "|");
        Properties topicProps = new Properties();
		topicProps.put("source-name", source);
		topicProps.put("polling.delay", 100);
		return new CsvCloudScopedListenTopic(
				new FilesScopedListenTopic.FetchScopeToFileScanParametersConverter(


Dependency to include:

    <!-- For Aws CSV Source -->

To load from S3 storage, a cloud specific topic CsvCloudScopedListenTopic needs to be created as explained above. A few additional beans are also created to establish connectivity with remote storage. Below is the example. The connectivity details for S3 bucket are provided via application properties.

    public ICloudDirectory<S3Object> rootDirectory() {
        return new S3CloudDirectory(client(), env.getRequiredProperty(ACsvSourceConfig.AWS_BUCKET), env.getRequiredProperty(ACsvSourceConfig.AWS_ROOT_DIRECTORY));


    public AmazonS3 client() {
        return AmazonS3Client.builder()
                .withCredentials(new DefaultAWSCredentialsProviderChain())
                .withClientConfiguration(new ClientConfiguration().withMaxConnections(128))

    public ICloudCsvDataProviderFactory<S3Object> dataProviderFactory() {
        //the maximum number of threads that will download in parallel parts of an entity
        String cloudFetchThread = env.getProperty(ACsvSourceConfig.ACTIVE_CLOUD_FETCH_THREAD, "10");
        return new AwsCsvDataProviderFactory(new CloudFetchingConfig(Integer.parseInt(cloudFetchThread)));

Azure Blob

Dependency to include:

    <!-- For Azure CSV Source -->

Here we create a CsvCloudScopedListenTopic similar to S3. Below are the necessary methods to connect to an Azure Blob.

    public ICloudDirectory<CloudBlob> rootDirectory() {
        return new AzureCloudDirectory(client(), env.getRequiredProperty(AZURE_CONTAINER_NAME),

    /*Expect Azure connection string in the VM parameter
     * -Dazure.connection.string
    public CloudBlobClient client() {
        String storageConnectionString = System.getProperty(AZURE_STORGE_CONNECTION_STRING);
        CloudStorageAccount storageAccount;
        CloudBlobClient blobClient;
        CloudBlobContainer container = null;

        // Parse the connection string and create a blob client to interact with Blob storage                                                             
        try {
            storageAccount = CloudStorageAccount.parse(storageConnectionString);

            blobClient = storageAccount.createCloudBlobClient();
            container = blobClient.getContainerReference(env.getRequiredProperty(AZURE_CONTAINER_NAME));

        } catch (InvalidKeyException | URISyntaxException | StorageException e) {
            throw new UnsupportedOperationException("Azure Cloud Blob Client Issues");
        return blobClient;

    public ICloudCsvDataProviderFactory<CloudBlob> dataProviderFactory() {
        //the maximum number of threads that will download in parallel parts of an entity
        String cloudFetchThread = env.getProperty(ACTIVE_CLOUD_FETCH_THREAD, ACTIVE_CLOUD_FETCH_THREAD);
        return new AzureCsvDataProviderFactory(new CloudFetchingConfig(Integer.parseInt(cloudFetchThread) ));

Google Storage

Dependency to include:

    <!-- For Google-Cloud-Platform CSV Source -->

Similar to S3 and Azure, we create a CsvCloudScopedListenTopic. The cloud directory and client are configured as below -

    public ICloudDirectory<Blob> rootDirectory() {
        Map<String, String> temp = System.getenv();
        return new GoogleCloudDirectory(client(), env.getRequiredProperty(ACsvSourceConfig.GOOGLE_CONTAINER_NAME),

    public Storage client() {
        return StorageOptions.getDefaultInstance().getService();

    public ICloudCsvDataProviderFactory<Blob> dataProviderFactory() {
        //the maximum number of threads that will download in parallel parts of an entity
        String cloudFetchThread = env.getProperty(ACsvSourceConfig.ACTIVE_CLOUD_FETCH_THREAD, ACsvSourceConfig.ACTIVE_CLOUD_FETCH_THREAD);

        return new GoogleCsvDataProviderFactory(new CloudFetchingConfig(Integer.parseInt(cloudFetchThread) ));

Application properties

Below is an example of the application properties configured to load from cloud storage options mentioned above. Please provide the values obtained from your account.

# General Cloud Source Properties

# AWS Source Properties
aws.region =

# Azure Source Properites

# Google Cloud Platform Source Properties