Cloud Source
Introduction
Atoti's CSV and Parquet Sources can be interfaced with a Cloud Source, which allows these sources to extract their data directly from a cloud storage provider.
The Cloud Source provides a thin abstraction layer for common cloud providers that is described in this page.
Supported cloud providers for the Cloud Source are:
These storages share interesting characteristics:
- Very good performance
- Unlimited storage capacity
- Affordable (~$0.3 per GB/year)
- Free upload
You can deploy an Atoti application in the cloud that will download a huge amount of data in a few minutes (see Azure's blog post).
Be careful to deploy your code and your data in the same region otherwise you will incur extra fees when the CSV Source loads data.
Cloud Source Abstractions
All cloud providers organize their data in a structure that can be assimilated with directories and entities (which can be associated with what a file is in a traditional local file system).
The abstractions used by the cloud source refer to these two concepts for all cloud sources. Specific information on usage and on what exactly these terms refer to can be found in the appropriate cloud provider page.
For each of the following interfaces, additional methods are usually available when using cloud-provider-specific implementors instead. Refer to the appropriate cloud provider page for more detail.
ICloudDirectory
This interface represents a directory in the cloud storage. The directory
structure is emulated by using the /
character.
It exposes methods for listing all contained subdirectories and entities. Assuming the following folder structure:
📦someDirectory
┣ 📂subdirectory1
┃ ┗ 📜entity2.csv
┣ 📂subdirectory2
┃ ┃ 📜entity3.csv
┃ ┗ 📂subdirectory3
┗ 📜entity1.csv
The listing methods will return the following objects:
final ICloudDirectory<EntityType> directory = someDirectory;
final List<ICloudDirectory<EntityType>> subdirectories = directory.listDirectories();
final List<ICloudEntityPath<EntityType>> entities = directory.listEntities(false);
final List<ICloudEntityPath<EntityType>> allEntities = directory.listEntities(true);
subdirectories
will contain subdirectory1 and subdirectory2entities
will contain entity1.txtallEntities
will contain entity1.txt, entity2.txt and entity3.txt
An entity or subdirectory can also be retrieved directly from a parent directory.
final ICloudDirectory<EntityType> directory = someDirectory;
final ICloudDirectory<EntityType> subdirectory1 =
someDirectory.getSubDirectory("subdirectory1");
final ICloudDirectory<EntityType> subdirectory3 =
someDirectory.getSubDirectory("subdirectory2/subdirectory3");
final ICloudEntityPath<EntityType> entity1 = someDirectory.getEntity("entity1.csv");
final ICloudEntityPath<EntityType> entity2 =
someDirectory.getEntity("subdirectory1/entity2.csv");
ICloudEntity
This interface is associated with a unit of data on the cloud storage, much like a file in a traditional local file system. Instances can be created without performing remote calls.
It serves as a facade to an entity on a cloud storage. It provides various methods to retrieve an entity's metadata and content:
final EntityType entity = someEntity;
final String entityName = entity.getName();
final String parentBucket = entity.getBucket();
final long size = entity.getLength();
final Date lastModified = entity.getLastModified();
final byte[] contents = entity.download();
// ...
entity.delete();
ICloudEntityPath
This interface represents a path to a potential ICloudEntity
object. It allows the creation of the referred-to ICloudEntity
, or/and its
retrieval once it is created.
Much like ICloudEntity
it provides various methods to retrieve the entity's
metadata.
final ICloudEntityPath<EntityType> entityPath = someDirectory.getEntity("example.csv");
if (entityPath.exists()) {
final String name = entityPath.getName();
final String parentBucket = entityPath.getBucket();
final Date lastModified = entityPath.getLastModifiedTime();
}
final EntityType entity = entityPath.toEntity();
// ...
It additionally exposes methods to upload content to the referenced entity,
which is not required to already exist. In this case, an actual entity is
created on the cloud storage using the information held by the
ICloudEntityPath
.
entityPath.upload("data".getBytes());
entityPath.upload(Path.of("path", "to", "file"));
entityPath.upload(csvInputStream(), contentLength);
Warning: if the entity already exists at the referenced path, its contents will be overwritten.
Usage with Data Sources
Concurrently Fetching Channel
In order to download data from the cloud, Atoti Server uses
AConcurrentlyFetchingChannel
, a custom high-throughput implementation
SeekableByteChannel
specialized for downloading data from the cloud. The class is specialized for
each supported cloud provider.
This channel implementation downloads the file in multiple parts concurrently, and does not block, which means data from downloaded parts can be read while the others finish downloading.
Atoti Data Sources use internally a concurrently fetching channel to retrieve data from the cloud. The channel can be configured with a number of parameters:
threadCount
: the maximum number of downloader threads for one single file. Our experiments have shown that the optimal number of threads downloading files in parallel is around 1.5 times the number of the server's logical cores.prefetchedPartsLimit
: the amount of chunks that can be downloaded in advance. This is used to compensate network speed instability.partLength
: the size in bytes of an individual downloaded part. Our experiments have shown that the optimal value is around 8MB.
These parameters are regrouped into a CloudFetchingConfig
object:
final CloudFetchingConfig cloudConfig =
CloudFetchingConfig.builder()
.downloadThreadCount(threadCount)
.prefetchedPartsLimit(prefetchedPartsLimit)
.partLength(partLength)
.build();
Usage with the CSV Source
Cloud-based implementations of CSV topics are available through
CloudEntityCsvTopic<EntityType>
and CloudDirectoryCsvTopic<EntityType>
.
They are used similarly to regular CSV topics, except they require one
additional argument of type ICloudCsvDataProviderFactory
. It is a factory that
provides the implementation to use for downloading. The appropriate
implementation should be selected based on your cloud provider.
One default implementation is made available for each cloud provider. These implementations
accept a CloudFetchingConfig
that they can use in
the underlying concurrently fetching channel.
A CSV topic can be created either for a single entity:
final CsvParserConfiguration parserConfig =
CsvParserConfiguration.builder().withColumnCount(COLUMN_COUNT).build();
final CloudFetchingConfig cloudConfig =
CloudFetchingConfig.builder()
.downloadThreadCount(threadCount)
.prefetchedPartsLimit(prefetchedPartsLimit)
.partLength(partLength)
.build();
final ICloudCsvDataProviderFactory<S3Entity> factory =
new AwsCsvDataProviderFactory(cloudConfig);
final ICloudEntityPath<S3Entity> entityPath = someEntity;
final ICsvTopic<ICloudEntityPath<S3Entity>> entityTopic =
new CloudEntityCsvTopic<>("entityTopic", parserConfig, factory, entityPath);
or for multiple entities (for dataset split between several files). In this case, the files can either be listed explicitly:
final Set<ICloudEntityPath<S3Entity>> cloudPaths = Set.of(someEntity1, someEntity2);
final ICsvTopic<ICloudEntityPath<S3Entity>> multipleEntitiesTopic =
new CloudMultipleEntitiesCsvTopic<>("entityTopic", parserConfig, factory, cloudPaths);
or can be defined by specifying a whole directory, in which case files to consider can be filtered using a regular expression:
final ICloudDirectory<S3Entity> directory = someDirectory;
final ICsvTopic<ICloudEntityPath<S3Entity>> directoryTopic =
new CloudDirectoryCsvTopic<>(
"directoryTopic", parserConfig, factory, directory, "(R|r)egex");
Usage with the Parquet Source
The IParquetParser
interface exposes special overloads of the
IParquetParser.parse()
method to retrieve and parse parquet data from the
cloud.
Compared to the regular parsing methods, they require one additional Function
argument, that should act as a factory for AzureBlobChannel
,
S3ObjectChannel
or GoogleBlobChannel
depending on the relevant cloud
provider.
Parquet data can either be fetched from a single entity:
final CloudFetchingConfig cloudConfig =
CloudFetchingConfig.builder()
.downloadThreadCount(threadCount)
.prefetchedPartsLimit(prefetchedPartsLimit)
.partLength(partLength)
.build();
final ICloudEntityPath<EntityType> entityPath = someEntity;
datastore.edit(
transaction ->
parser.parse(
entityPath,
path -> new ConcurrentlyFetchingChannelImplementation(path, cloudConfig),
parsers,
mapping));
or for a whole directory:
final ICloudDirectory<EntityType> directory = someDirectory;
datastore.edit(
transaction ->
parser.parse(
directory,
path -> new ConcurrentlyFetchingChannelImplementation(path, cloudConfig),
parsers,
mapping));
What impacts the download speed?
- The power of the cores. HTTPS connections and client side-encryption consume CPU resources.
- The bandwidth of the network interface.
- Other CPU-hungry tasks performed by your application while downloading the data.
- Having a multitude of small files instead of big files (Due to the fact that
the
AConcurrentlyFetchingChannel
is tailored for a high-performance multithreaded download of a single file). - The type of the storage: Hot or Cold (Do not use cold storage!).
- For Azure the redundancy has an impact. Locally redundant storage (LRS) is better (for the speed) than Zone-redundant storage (ZRS) which is better than Geo-redundant storage (GRS)
- Whether the Atoti application and your data are in the same cloud region.
- Receive Side Scaling (RSS) must be enabled in the network driver. (Seems to be enabled by default now on all cloud providers)