Cloud Source
Introduction
ActivePivot's CSV and Parquet Sources can be interfaced with a Cloud Source, which allows these sources to extract their data directly from a cloud storage provider.
The Cloud Source provides a thin abstraction layer for common cloud providers that is described in this page.
Supported cloud providers for the Cloud Source are:
These storages share interesting characteristics:
- Very good performance
- Unlimited storage capacity
- Affordable (~$0.3 per GB/year)
- Free upload
You can deploy an ActivePivot instance in the cloud that will download a huge amount of data in a few minutes (see Azure's blog post).
Be careful to deploy your code and your data in the same region otherwise you will have extra fees when the CSV Source will load data.
Cloud Source Abstractions
All cloud providers organize their data in a structure that can be assimilated with directories and entities (which can be associated with what a file is in a traditional local file system).
The abstractions used by the cloud source refer to these two concepts for all cloud sources. Specific information on usage and on what exactly these terms refer to can be found in the appropriate cloud provider page.
For each of the following interfaces, additional methods are usually available when using cloud-provider-specific implementors instead. Refer to the appropriate cloud provider page for more detail.
ICloudDirectory
This interface represents a directory in the cloud storage. The directory
structure is emulated by using the /
character.
It exposes methods for listing all contained subdirectories and entities. Assuming the following folder structure:
📦someDirectory
┣ 📂subdirectory1
┃ ┗ 📜entity2.csv
┣ 📂subdirectory2
┃ ┃ 📜entity3.csv
┃ ┗ 📂subdirectory3
┗ 📜entity1.csv
The listing methods will return the following objects:
ICloudDirectory<EntityType> directory = someDirectory;
List<ICloudDirectory<EntityType>> subdirectories = directory.listDirectories();
List<ICloudEntityPath<EntityType>> entities = directory.listEntities(false);
List<ICloudEntityPath<EntityType>> allEntities = directory.listEntities(true);
subdirectories
will contain subdirectory1 and subdirectory2entities
will contain entity1.txtallEntities
will contain entity1.txt, entity2.txt and entity3.txt
An entity or subdirectory can also be retrieved directly from a parent directory.
ICloudDirectory<EntityType> directory = someDirectory;
ICloudDirectory<EntityType> subdirectory1 =
someDirectory.getSubDirectory("subdirectory1");
ICloudDirectory<EntityType> subdirectory3 =
someDirectory.getSubDirectory("subdirectory2/subdirectory3");
ICloudEntityPath<EntityType> entity1 =
someDirectory.getEntity("entity1.csv");
ICloudEntityPath<EntityType> entity2 =
someDirectory.getEntity("subdirectory1/entity2.csv");
ICloudEntity
This interface is associated with a unit of data on the cloud storage, much like a file in a traditional local file system. Instances can be created without performing remote calls.
It serves as a facade to an entity on a cloud storage. It provides various methods to retrieve an entity's metadata and content:
ICloudEntity<EntityType> entity = someEntity;
String entityName = entity.getName();
String parentBucket = entity.getBucket();
long size = entity.getLength();
Date lastModified = entity.getLastModified();
byte[] contents = entity.download();
// ...
entity.delete();
The underlying cloud-provider-specific entity object can also be retrieved for more complex operations. The actual type of the returned object depends on the cloud provider.
EntityType underlyingEntity = entity.getUnderlying();
ICloudEntityPath
This interface holds a reference to an ICloudEntity
object.
Much like ICloudEntity
it provides various methods to retrieve the entity's
metadata.
ICloudEntityPath<EntityType> entityPath = someDirectory.getEntity("example.csv");
if (entityPath.exists()) {
String name = entityPath.getName();
String parentBucket = entityPath.getBucket();
Date lastModified = entityPath.getLastModifiedTime();
}
ICloudEntity<EntityType> entity = entityPath.toEntity();
// ...
entityPath.delete();
It additionally exposes methods to upload content to the referenced entity,
which is not required to already exist. In this case, an actual entity is
created on the cloud storage using the information held by the
ICloudEntityPath
.
entityPath.upload("data".getBytes());
entityPath.upload(Path.of("path", "to", "file"));
entityPath.upload(csvInputStream(), contentLength);
Warning: if the entity already exists at the referenced path, its contents will be overwritten.
Usage with Data Sources
Concurrently Fetching Channel
In order to download data from the cloud, ActivePivot uses
AConcurrentlyFetchingChannel
, a custom high-throughput implementation
SeekableByteChannel
specialized for downloading data from the cloud. The class is specialized for
each supported cloud provider.
This channel implementation downloads the file in multiple parts concurrently, and does not block which means data from downloaded parts can be read while the others finish downloading.
ActivePivot Data Sources internally use a concurrently fetching channel to retrieve data from the cloud. The channel can be configured with a number of parameters:
threadCount
: the maximum number of downloader threads for one single file. Our experiments have shown that the optimal number of threads downloading files in parallel is around 1.5 times the number of the server's logical cores.prefetchedPartsLimit
: the amount of chunks that can be downloaded in advance. This is used to compensate network speed instability.partLength
: the size in bytes of an individual downloaded part. Our experiments have shown that the optimal value is around 8MB.
These parameters are regrouped into a CloudFetchingConfig
object:
CloudFetchingConfig cloudConfig =
new CloudFetchingConfig(threadCount, prefetchedPartsLimit, partLength);
Usage with the CSV Source
Cloud-based implementations of CSV topics are available through
CloudEntityCSVTopic<EntityType>
and CloudDirectoryCSVTopic<EntityType>
.
They are used similarly to regular CSV topics, except they require one
additional argument of type ICloudCsvDataProviderFactory
. It is a factory that
provides the implementation to use for downloading. The appropriate
implementation should be chosen according to your cloud provider.
There is one default implementation available for each cloud provider, that
accept a CloudFetchingConfig
to be used by
the underlying concurrently fetching channel.
A CSV topic can be created either for a single entity:
CSVParserConfiguration parserConfig = new CSVParserConfiguration();
CloudFetchingConfig cloudConfig =
new CloudFetchingConfig(threadCount, prefetchedPartsLimit, partLength);
ICloudCsvDataProviderFactory<S3Object> factory =
new AwsCsvDataProviderFactory(cloudConfig);
ICloudEntityPath<S3Object> entityPath = someEntity;
ICSVTopic<ICloudEntityPath<S3Object>> entityTopic =
new CloudEntityCSVTopic<>("entityTopic", parserConfig, factory, entityPath);
or for a whole directory, in which case files to consider can be filtered using a regex:
ICloudDirectory<S3Object> directory = someDirectory;
ICSVTopic<ICloudEntityPath<S3Object>> directoryTopic =
new CloudDirectoryCSVTopic<>(
"directoryTopic", parserConfig, factory, directory, "(R|r)egex");
Usage with the Parquet Source
The IParquetParser
interface exposes special overloads of the
IParquetParser.parse()
method to retrieve and parse parquet data from the
cloud.
Compared to the regular parsing methods, they require one additional Function
argument, that should act as a factory of AConcurrentlyFetchingChannel
.
Parquet data can either be fetched from a single entity:
CloudFetchingConfig cloudConfig =
new CloudFetchingConfig(threadCount, prefetchedPartsLimit, partLength);
ICloudEntityPath<EntityType> entityPath = someEntity;
datastore.edit(transaction -> parser.parse(
entityPath,
path -> new ConcurrentlyFetchingChannelImplementation(path, cloudConfig),
parsers,
mapping));
or for a whole directory:
ICloudDirectory<EntityType> directory = someDirectory;
datastore.edit(transaction -> parser.parse(
directory,
path -> new ConcurrentlyFetchingChannelImplementation(path, cloudConfig),
parsers,
mapping));
What impacts the download speed?
- The power of the cores. HTTPS connections and client side-encryption consume CPU resources.
- The bandwidth of the network interface.
- Other CPU-hungry tasks performed by your application while downloading the data.
- Having a multitude of small files instead of big files (Due to the fact that
the
AConcurrentlyFetchingChannel
is tailored for a high-performance multithreaded download of a single file). - The type of the storage: Hot or Cold (Do not use cold storage!).
- For Azure the redundancy has an impact. Locally redundant storage (LRS) is better (for the speed) than Zone-redundant storage (ZRS) which is better than Geo-redundant storage (GRS)
- ActivePivot and your data must be in the same cloud region.
- Receive Side Scaling (RSS) must be enabled in the network driver. (Seems to be enabled by default now on all cloud providers)