DLC Data Sources

CSV Source

CSVs can be loaded by an implementing class of ACsvScopedFetchSource that supports fetching data for a specific scope (for example, CobDate, sub-folder) that is specified at runtime for each fetch iteration.

DLC supports loading CSVs from either local filesystem or cloud storage. Amongst the cloud storage options, AWS S3, Azure Blob and Google Storage are currently supported.

Load from local filesystem

We first must ensure we are importing the correct dependency. Your pom should have:

    <!-- Local CSV Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>a
        <artifactId>data-connectors-csv</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

To load from local filesystem, a CsvFilesScopedFetchTopic needs to be created.
Example:

  
    @Bean
    public LocalCsvScopedFetchSource localCsvScopedFetchSource() {
        final Properties sourceProps = new Properties();
        sourceProps.setProperty(ACsvScopedFetchSource.PROP__SOURCE_NAME, "LocalCSVScopedSource");

        List<String> scopeKeys = new ArrayList<>();
        scopeKeys.add(DataLoadControllerRestService.SCOPE_KEY__COB_DATE);

        final LocalCsvScopedFetchSource<Path> localCsvScopedFetchSource = new LocalCsvScopedFetchSource<>(sourceProps);

        localCsvScopedFetchSource.addTopic(                
            new CsvFilesScopedFetchTopic(CSV_TOPIC__CASHFLOW,
                    csvDataRootDirPath,
                    CASHFLOW_FILE_PATTERN == null ? null : FileSystems.getDefault().getPathMatcher(CASHFLOW_FILE_PATTERN),
                    new CsvFilesScopedFetchTopic.FetchScopeToFileScanParametersConverter(
                            scopeKeys.toArray(new String[scopeKeys.size()])),
                    localCsvScopedFetchSource.createParserConfiguration(CASHFLOW_FILE_COLUMNS, linesToSkip)
            )
        );

        return localCsvScopedFetchSource;
    }

Load from a cloud source

We first must ensure we are importing the correct dependency. Your pom should have one or many of the following:

    <!-- For Aws CSV Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>data-connectors-aws</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

    <!-- For Azure CSV Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>data-connectors-azure</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

    <!-- For Google-Cloud-Platform CSV Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>data-connectors-gcp</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

To load from a cloud storage, the CSV source and cloud specific CSV topic can be configured as below -

    @Bean
    public S3CsvScopedFetchSource s3CsvScopedFetchSource() {
        final Properties sourceProps = new Properties();
        sourceProps.setProperty(ACsvScopedFetchSource.PROP__SOURCE_NAME, "S3CSVScopedSource");

        List<String> scopeKeys = new ArrayList<>();
        scopeKeys.add(DataLoadControllerRestService.SCOPE_KEY__COB_DATE);

        final S3CsvScopedFetchSource<Path> s3CsvScopedFetchSource = new S3CsvScopedFetchSource<>(sourceProps);

        s3CsvScopedFetchSource.addTopic(                
            createDirectoryTopic(
                    source, CSV_TOPIC__TRADE, inputDataRootDirPath , DELIV_SUBDIR,
                    INPUT_FILE_PATH_MATCHER_TRADE, source.createParserConfiguration(generateTradeCsvFileds(), NB_HEADER_LINES_TO_SKIP)
            )
        );

        return s3CsvScopedFetchSource;
    }

    @Override
    protected ICSVTopic<ICloudEntityPath<S3Object>> createDirectoryTopic(ICSVSource source, String topic, String path, String subdirectory,
            String pathMatcherSyntaxAndPattern, ICSVParserConfiguration parserConfig) {

        ICloudDirectory<S3Object> cloudDirectory = rootDirectory();
        if (!subdirectory.isEmpty()) {
            // Check if subdirectory contains a file separator, remove if so.
            if(subdirectory.toCharArray()[0] == '\\' || subdirectory.toCharArray()[0] == '/'){
                subdirectory = subdirectory.substring(1);
            }
            cloudDirectory = cloudDirectory.getSubDirectory(subdirectory);
        }
        pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("**", "*");
        pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("*", ".*");
        pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("glob:", "");
        pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("{", "(");
        pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("}", ")");
        pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace(",", "|");
        return new CsvCloudScopedFetchTopic(topic, cloudDirectory,dataProviderFactory(),pathMatcherSyntaxAndPattern,
                new FilesScopedFetchTopic.FetchScopeToFileScanParametersConverter(
                        DataLoadControllerRestService.SCOPE_KEY__COB_DATE),parserConfig, "" ,null);
    }

AWS S3

Dependency to include:

    <!-- For Aws CSV Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>data-connectors-aws</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

To load from S3 storage, a cloud specific topic CsvCloudScopedFetchTopic needs to be created as explained above. A few additional beans are also created to establish connectivity with remote storage. Below is the example. The connectivity details for S3 bucket are provided via application properties.

    @Bean
    public ICloudDirectory<S3Object> rootDirectory() {
        return new S3CloudDirectory(client(), env.getRequiredProperty(ACsvSourceConfig.AWS_BUCKET), env.getRequiredProperty(ACsvSourceConfig.AWS_ROOT_DIRECTORY));

    }

    @Bean
    public AmazonS3 client() {
        return AmazonS3Client.builder()
                .withCredentials(new DefaultAWSCredentialsProviderChain())
                .withRegion(Regions.fromName(env.getRequiredProperty(ACsvSourceConfig.AWS_REGION)))
                .withClientConfiguration(new ClientConfiguration().withMaxConnections(128))
                .build();
    }

    @Bean
    public ICloudCsvDataProviderFactory<S3Object> dataProviderFactory() {
        //the maximum number of threads that will download in parallel parts of an entity
        String cloudFetchThread = env.getProperty(ACsvSourceConfig.ACTIVE_CLOUD_FETCH_THREAD, "10");
        return new AwsCsvDataProviderFactory(new CloudFetchingConfig(Integer.parseInt(cloudFetchThread)));
    }

Azure Blob

Dependency to include:

    <!-- For Azure CSV Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>azure</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

Here we create a CsvCloudScopedFetchTopic similar to S3.

  
    @Bean
    public ICloudDirectory<CloudBlob> rootDirectory() {
        return new AzureCloudDirectory(client(), env.getRequiredProperty(AZURE_CONTAINER_NAME),
                env.getRequiredProperty(AZURE_ROOT_DIRECTORY));
    }

    @Bean
    /*Expect Azure connection string in the VM parameter
     * -Dazure.connection.string
     */
    public CloudBlobClient client() {
        String storageConnectionString = System.getProperty(AZURE_STORGE_CONNECTION_STRING);
        CloudStorageAccount storageAccount;
        CloudBlobClient blobClient;
        CloudBlobContainer container = null;

        // Parse the connection string and create a blob client to interact with Blob storage                                                             
        try {
            storageAccount = CloudStorageAccount.parse(storageConnectionString);

            blobClient = storageAccount.createCloudBlobClient();
            container = blobClient.getContainerReference(env.getRequiredProperty(AZURE_CONTAINER_NAME));

        } catch (InvalidKeyException | URISyntaxException | StorageException e) {
            e.printStackTrace();
            throw new UnsupportedOperationException("Azure Cloud Blob Client Issues");
        }
        return blobClient;
    }

    @Bean
    public ICloudCsvDataProviderFactory<CloudBlob> dataProviderFactory() {
        //the maximum number of threads that will download in parallel parts of an entity
        String cloudFetchThread = env.getProperty(ACTIVE_CLOUD_FETCH_THREAD, ACTIVE_CLOUD_FETCH_THREAD);
        return new AzureCsvDataProviderFactory(new CloudFetchingConfig(Integer.parseInt(cloudFetchThread) ));
    }

Google Storage

Dependency to include:

    <!-- For Google-Cloud-Platform CSV Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>data-connectors-gcp</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

Similar to S3 and Azure, we create a CsvCloudScopedFetchTopic. The cloud directory and client are configured as below -

    @Bean
    public ICloudDirectory<Blob> rootDirectory() {
        Map<String, String> temp = System.getenv();
        return new GoogleCloudDirectory(client(), env.getRequiredProperty(ACsvSourceConfig.GOOGLE_CONTAINER_NAME),
                env.getRequiredProperty(ACsvSourceConfig.GOOGLE_ROOT_DIRECTORY));
    }

    @Bean
    public Storage client() {
        return StorageOptions.getDefaultInstance().getService();
    }

    @Bean
    public ICloudCsvDataProviderFactory<Blob> dataProviderFactory() {
        //the maximum number of threads that will download in parallel parts of an entity
        String cloudFetchThread = env.getProperty(ACsvSourceConfig.ACTIVE_CLOUD_FETCH_THREAD, ACsvSourceConfig.ACTIVE_CLOUD_FETCH_THREAD);

        return new GoogleCsvDataProviderFactory(new CloudFetchingConfig(Integer.parseInt(cloudFetchThread) ));
    }

Application properties

Below is an example of the application properties configured to load from cloud storage options mentioned above. Please provide the values obtained from your account.

# General Cloud Source Properties
cloud.fetch.thread=

# AWS Source Properties
aws.bucket=
aws.root_directory=
aws.region =
aws.accessKeyId=
aws.secretKey=

# Azure Source Properites
azure.container=
azure.root_directory=
azure.connection.string=

# Google Cloud Platform Source Properties
google.container=
google.root_directory=
local.root_directory=

JDBC Source

We first must ensure we are importing the correct dependency. Your pom should have:

    <!-- For JDBC Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>data-connectors-jdbc</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

We build an instance of JdbcScopedFetchSource that collects all the JDBC topics. It can load data from a Database into ActivePivot, based on SQL statements (defined via source topics) and fetch business scopes, which are defined at fetch time to derive the SQL input parameters required for injection into those statements.
Example:

    @Bean(destroyMethod = "close")
    public IJdbcScopedFetchSource jdbcSource() {

        final IJdbcScopedFetchSource source = new JdbcScopedFetchSource(dbConnectionConfig.dbConnectionFactory(), null);

        final String productTopicSql = String.join(" ",
                "SELECT ProductId AS ",
                DatastoreNames.PRODUCT_ID,
                ", Name AS ",
                DatastoreNames.PRODUCT_NAME,
                ", Color AS ",
                DatastoreNames.PRODUCT_COLOR,
                ", Price AS ",
                DatastoreNames.PRODUCT_PRICE,
                ", CobDate AS ",
                DatastoreNames.PRODUCT_COB_DATE,
                " FROM ", PRODUCT_TABLE_NAME
                ," WHERE  CobDate = ?"
                );

        source.addTopic(
                new JdbcStatementTopic(
                        JDBC_TOPIC__PRODUCT,
                        productTopicSql,
                        new CobDateScopeToSqlParametersConverter(),
                        null,
                        JdbcPreparedStatementTask.PLUGIN_KEY,
                        null));

        return source;
    }

    private static class CobDateScopeToSqlParametersConverter extends JdbcStatementTopic.ScopeToSqlParametersConverter {

        public CobDateScopeToSqlParametersConverter() {
            super(DataLoadControllerRestService.SCOPE_KEY__COB_DATE);
        }

        @Override
        protected Object formatValue(
                final String key,
                final Object value) {

            if (DataLoadControllerRestService.SCOPE_KEY__COB_DATE.equals(key)) {
                return java.sql.Date.valueOf(LocalDate.parse((String) value));
            }
            return super.formatValue(key, value);
        }    
    }

Messaging Source

We first must ensure we are importing the correct dependency. Your pom should have one of the following:

    <!-- For Kafka Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>data-connectors-kafka</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

    <!-- For RabbitMq Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>data-connectors-rabbitmq</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

MessagingSource currently supports loading from Apache Kafka and RabbitMQ out of the box. To load data from a different messaging system, an implementation of IMessagingConsumer needs to be provided.
Following section shows how to configure the source and topic for such a system, via Spring config.

Remote broker details

One can specify the connectivity details of a remote broker with a Spring config.

Depending on the kind of security config in a project, one may need to add/remove certain properties.

The connection properties can be wrapped in a Properties object:

    @Bean
    public Properties kafkaConnectionProperties() {

        Properties kafkaConnectionProps = new Properties();		
        kafkaConnectionProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 	                
        kafkaConnectionProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);        
        kafkaConnectionProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        kafkaConnectionProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        kafkaConnectionProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        kafkaConnectionProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        kafkaConnectionProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
        kafkaConnectionProps.put("message.max.bytes", messageMaxBytes);
        kafkaConnectionProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
        kafkaConnectionProps.put(PROP_POLL_TIMEOUT, pollTimeout);

        kafkaConnectionProps.put(PROP_SCHEMA_REGISTRY_URL, schemaRegistryUrl);

        return kafkaConnectionProps;
    }  

Messaging Source and Topic

Next, one needs to construct the messaging source and add topics. The AP topic name, name of the topic on remote broker, various other properties are passed to the MessagingTopic constructor as below -

    @Bean
    public MessagingSource kafkaMessagingSource() {

        Properties kafkaConnectionProperties = kafkaConnectionConfig.kafkaConnectionProperties();
        MessagingSource messagingSource = new MessagingSource("KafkaMessagingSource");

        Properties tradeProps = (Properties) kafkaConnectionProperties.clone();

        // All the consumers for a topic should have same group.id   

        tradeProps.put(ConsumerConfig.GROUP_ID_CONFIG,
                tradeTopicGroupId.equals("UNIQUE") ? serviceUsername +"."+ UUID.randomUUID().toString() :
                        tradeTopicGroupId);

        tradeProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        tradeProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class.getName());

        messagingSource.addTopic(
                new MessagingTopic(
                        TOPIC__KAFKA_MESSAGING_TRADE,
                        tradeKafkaBrokerTopic,
                        TRADE_STORE_FIELDS,
                        tradeProps,
                        tradeKafkaConsumers,
                        datastoreConfig.datastore(),
                        new String[] {DatastoreNames.TRADE_STORE_NAME},
                        messageConsumerPluginKey));

        return messagingSource;
    }

    private static final List<String> TRADE_STORE_FIELDS = Arrays.asList(
            DatastoreNames.TRADE_ID,
            DatastoreNames.TRADE_PRODUCT_ID,
            DatastoreNames.TRADE_PRODUCT_QUANTITY,
            DatastoreNames.TRADE_COB_DATE
        );

Messaging Consumer

Messaging consumers are the ones responsible for connecting to remote messaging system, keep listening for new data, and consume data when available.

The consumers are created when the request to start “listening” to remote topic is submitted at runtime. The underlying Kafka / RabbitMQ client consumers are created in the constructor of messaging consumer implementation - KafkaMessageConsumer, RabbitMqMessageConsumer.

The remote topics are polled for records when the consumer thread begins, i.e. inside the Java Callable’s call method.

search.js