Skip to main content

Cloud Source

Introduction

Atoti's CSV and Parquet Sources can be interfaced with a Cloud Source, which allows these sources to extract their data directly from a cloud storage provider.

Atoti Cloud Source schema

The Cloud Source provides a thin abstraction layer for common cloud providers that is described in this page.

Supported cloud providers for the Cloud Source are:

These storages share interesting characteristics:

  • Very good performance
  • Unlimited storage capacity
  • Affordable (~$0.3 per GB/year)
  • Free upload

You can deploy an Atoti application in the cloud that will download a huge amount of data in a few minutes (see Azure's blog post).

Be careful to deploy your code and your data in the same region otherwise you will incur extra fees when the CSV Source loads data.

Cloud Source Abstractions

All cloud providers organize their data in a structure that can be assimilated with directories and entities (which can be associated with what a file is in a traditional local file system).

The abstractions used by the cloud source refer to these two concepts for all cloud sources. Specific information on usage and on what exactly these terms refer to can be found in the appropriate cloud provider page.

For each of the following interfaces, additional methods are usually available when using cloud-provider-specific implementors instead. Refer to the appropriate cloud provider page for more detail.

ICloudDirectory

This interface represents a directory in the cloud storage. The directory structure is emulated by using the / character.

It exposes methods for listing all contained subdirectories and entities. Assuming the following folder structure:

📦someDirectory
┣ 📂subdirectory1
┃ ┗ 📜entity2.csv
┣ 📂subdirectory2
┃ ┃ 📜entity3.csv
┃ ┗ 📂subdirectory3
┗ 📜entity1.csv

The listing methods will return the following objects:

final ICloudDirectory<EntityType> directory = someDirectory;
final List<ICloudDirectory<EntityType>> subdirectories = directory.listDirectories();
final List<ICloudEntityPath<EntityType>> entities = directory.listEntities(false);
final List<ICloudEntityPath<EntityType>> allEntities = directory.listEntities(true);

  • subdirectories will contain subdirectory1 and subdirectory2
  • entities will contain entity1.txt
  • allEntities will contain entity1.txt, entity2.txt and entity3.txt

An entity or subdirectory can also be retrieved directly from a parent directory.

final ICloudDirectory<EntityType> directory = someDirectory;
final ICloudDirectory<EntityType> subdirectory1 =
someDirectory.getSubDirectory("subdirectory1");
final ICloudDirectory<EntityType> subdirectory3 =
someDirectory.getSubDirectory("subdirectory2/subdirectory3");
final ICloudEntityPath<EntityType> entity1 = someDirectory.getEntity("entity1.csv");
final ICloudEntityPath<EntityType> entity2 =
someDirectory.getEntity("subdirectory1/entity2.csv");

ICloudEntity

This interface is associated with a unit of data on the cloud storage, much like a file in a traditional local file system. Instances can be created without performing remote calls.

It serves as a facade to an entity on a cloud storage. It provides various methods to retrieve an entity's metadata and content:

final EntityType entity = someEntity;
final String entityName = entity.getName();
final String parentBucket = entity.getBucket();
final long size = entity.getLength();
final Date lastModified = entity.getLastModified();
final byte[] contents = entity.download();
// ...
entity.delete();

ICloudEntityPath

This interface represents a path to a potential ICloudEntity object. It allows the creation of the referred-to ICloudEntity, or/and its retrieval once it is created.

Much like ICloudEntity it provides various methods to retrieve the entity's metadata.

final ICloudEntityPath<EntityType> entityPath = someDirectory.getEntity("example.csv");
if (entityPath.exists()) {
final String name = entityPath.getName();
final String parentBucket = entityPath.getBucket();
final Date lastModified = entityPath.getLastModifiedTime();
}
final EntityType entity = entityPath.toEntity();
// ...
entityPath.delete();

It additionally exposes methods to upload content to the referenced entity, which is not required to already exist. In this case, an actual entity is created on the cloud storage using the information held by the ICloudEntityPath.

entityPath.upload("data".getBytes());
entityPath.upload(Path.of("path", "to", "file"));
entityPath.upload(csvInputStream(), contentLength);

Warning: if the entity already exists at the referenced path, its contents will be overwritten.

Usage with Data Sources

Concurrently Fetching Channel

In order to download data from the cloud, Atoti Server uses AConcurrentlyFetchingChannel, a custom high-throughput implementation SeekableByteChannel specialized for downloading data from the cloud. The class is specialized for each supported cloud provider.

This channel implementation downloads the file in multiple parts concurrently, and does not block, which means data from downloaded parts can be read while the others finish downloading.

Atoti Data Sources use internally a concurrently fetching channel to retrieve data from the cloud. The channel can be configured with a number of parameters:

  • threadCount: the maximum number of downloader threads for one single file. Our experiments have shown that the optimal number of threads downloading files in parallel is around 1.5 times the number of the server's logical cores.
  • prefetchedPartsLimit: the amount of chunks that can be downloaded in advance. This is used to compensate network speed instability.
  • partLength: the size in bytes of an individual downloaded part. Our experiments have shown that the optimal value is around 8MB.

These parameters are regrouped into a CloudFetchingConfig object:

final CloudFetchingConfig cloudConfig =
new CloudFetchingConfig(threadCount, prefetchedPartsLimit, partLength);

Usage with the CSV Source

Cloud-based implementations of CSV topics are available through CloudEntityCsvTopic<EntityType> and CloudDirectoryCsvTopic<EntityType>.

They are used similarly to regular CSV topics, except they require one additional argument of type ICloudCsvDataProviderFactory. It is a factory that provides the implementation to use for downloading. The appropriate implementation should be selected based on your cloud provider.

One default implementation is made available for each cloud provider. These implementations accept a CloudFetchingConfig that they can use in the underlying concurrently fetching channel.

A CSV topic can be created either for a single entity:

final CsvParserConfiguration parserConfig = new CsvParserConfiguration();
final CloudFetchingConfig cloudConfig =
new CloudFetchingConfig(threadCount, prefetchedPartsLimit, partLength);
final ICloudCsvDataProviderFactory<S3Entity> factory =
new AwsCsvDataProviderFactory(cloudConfig);
final ICloudEntityPath<S3Entity> entityPath = someEntity;
final ICsvTopic<ICloudEntityPath<S3Entity>> entityTopic =
new CloudEntityCsvTopic<>("entityTopic", parserConfig, factory, entityPath);

or for a whole directory, in which case files to consider can be filtered using a regular expression:

final ICloudDirectory<S3Entity> directory = someDirectory;
final ICsvTopic<ICloudEntityPath<S3Entity>> directoryTopic =
new CloudDirectoryCsvTopic<>(
"directoryTopic", parserConfig, factory, directory, "(R|r)egex");

Usage with the Parquet Source

The IParquetParser interface exposes special overloads of the IParquetParser.parse() method to retrieve and parse parquet data from the cloud.

Compared to the regular parsing methods, they require one additional Function argument, that should act as a factory for AzureBlobChannel, S3ObjectChannel or GoogleBlobChannel depending on the relevant cloud provider.

Parquet data can either be fetched from a single entity:

final CloudFetchingConfig cloudConfig =
new CloudFetchingConfig(threadCount, prefetchedPartsLimit, partLength);
final ICloudEntityPath<EntityType> entityPath = someEntity;
datastore.edit(
transaction ->
parser.parse(
entityPath,
path -> new ConcurrentlyFetchingChannelImplementation(path, cloudConfig),
parsers,
mapping));

or for a whole directory:

final ICloudDirectory<EntityType> directory = someDirectory;
datastore.edit(
transaction ->
parser.parse(
directory,
path -> new ConcurrentlyFetchingChannelImplementation(path, cloudConfig),
parsers,
mapping));

What impacts the download speed?

  • The power of the cores. HTTPS connections and client side-encryption consume CPU resources.
  • The bandwidth of the network interface.
  • Other CPU-hungry tasks performed by your application while downloading the data.
  • Having a multitude of small files instead of big files (Due to the fact that the AConcurrentlyFetchingChannel is tailored for a high-performance multithreaded download of a single file).
  • The type of the storage: Hot or Cold (Do not use cold storage!).
    • For Azure the redundancy has an impact. Locally redundant storage (LRS) is better (for the speed) than Zone-redundant storage (ZRS) which is better than Geo-redundant storage (GRS)
  • Whether the Atoti application and your data are in the same cloud region.
  • Receive Side Scaling (RSS) must be enabled in the network driver. (Seems to be enabled by default now on all cloud providers)