Parquet Source
Introduction
Parquet is a compact columnar file format, maintained by Apache, that comes with a built-in compression mecanism. It is the standard output of ETL pipelines when using Spark.
Read more information in the official website of Parquet.
ActivePivot is capable of consuming Parquet files, located on a local file system, in a Hadoop cluster and any of the supported Cloud Providers.
Installation
The Parquet Source is not released with the rest of ActivePivot artifacts, due to the very large number of dependencies it requires - hadoop ecosystem including zookeeper, hadoop, avro, parquet, etc.
A separate bundle parquet-source-x.y.z.zip contains the jars for this Source without all the external dependencies.
If the project is running on a Windows server, winutils
must be additionally installed. See Hadoop wiki for more information about this required dependency.
Using the Parquet Source
Because the pattern for other sources involving Topics, Channels and Publishers was confusing for developers, the Parquet Source is designed with a more straighforward approach. It is built with the Datastore it will feed and has only one entry method to parse a file or a directory.
This can always be composed with existing Topics to gain listen capabilities if needed.
datastore.getTransactionManager().startTransaction("store");
new ParquetParserBuilder(datastore)
.build()
.parse("/path/to/my/parquet/file.pq",
IStoreToParquetMapping.builder()
.onStore("store")
.feedStoreField("datastore1")
.withParquetField("parquet1")
.feedStoreField("datastore2")
.withColumnCalculator()
.fromSingleParquetField("parquet2")
.mapInt(v -> 2 * v)
.build());
datastore.getTransactionManager().commitTransaction();
This example uses a single thread to parse a single thread to parse one Parquet file located on the local File System.
The mapping between the file to parse and target Datastore Store is defined by the IStoreToParquetMapping
. In this example, we manually map the names of the Store fields and the Parquet fields. Additionally, we apply a basic transformation to the Parquet value for its field parquet2.
Loading files and directories
The Parquet Source transparently loads files and directories. When the provided path points to a directory, the Source recursively scans the directory and loads every file inside.
It is standard for ETL processes to produce .SUCCESS files, acting as markers of the process statuses. The Parquet Source automatically ignores such files.
It is also common for those processes to generate CRC files, to check for data corruption. The Parquet Source tries to use these as much as possible while reading each file.
An additional regex Pattern
can be passed to the parser to filter a subset of files to parse within the traversed directories,
as described under the filtering section below.
Usage and examples
Mapping a Parquet file to the Store to feed
By default, fields from the Parquet resource that are not mapped to the Datastore Store are ignored. However, in some cases, it is possible for the Datastore and the Parquet resource to have fields with the same case-insensitive name. As auto-completion is enabled by default when using the mapping builder, this field is automatically configured and will be filled.
To disable this, one must explicitly mark the field as fed from a null
Parquet field.
In our example, we consider we have a Parquet file of deliveries with the following fields:
- ID: identifier of a package
- city: a city where a transaction occurred
- location: a pair of (longitude, latitude) values pinning the exact place for the delivery
- package: dimensions of the package to deliver, as
<height>x<width>x<depth>
in centimeters
We want to feed a store containing the following columns:
- id: the same package identifier
- location: the same location, stored as an object
- city
- height
- width
- depth
Here, id and location fields are the same. City holds the sensibly same information, but for the application, we want to use a more generalized Map source, ignoring the distinction between a city and its suburbs. Therefore, the field city is filled by an automated trigger on each transaction from the location field. Finally, the package dimensions explode into 3 fields, for analytical purposes.
IStoreToParquetMapping mapping = new StoreToParquetMappingBuilder()
.onStore("delivery")
.feedStoreField("city").withParquetField(null)
.feedStoreField("height")
.withColumnCalculator().fromSingleParquetField("dimensions")
.map((String dims) -> Integer.parseInt(dims.split("x")[0]))
// ... width and depth ignored for conciseness
.build();
We use the auto-complete capability of the builder for id and location, as it works ignoring the case of the fields. Thus, we disable the feeding for city.
Three basic column calculators transform the dimensions into integer values.
Using sampling on the input data
As is the case for CSV files, it is possible for this source to sample the input Parquet files.
Sampling is implemented through the interface IParquetReaderPolicy
. The Parquet Source provides one basic implementation to load a limited number of files, or a limited number of records.
new ParquetParserBuilder(datastore)
.withNumberOfThreads(4)
.withPolicy(IParquetReaderPolicy.linePolicy(1_000, 10))
.build()
.parse("path/to/the/directory",
mapping);
The previous snippet illustrates how to load 1000 records from the input files. In addition to limiting the number of records, we add a step between each accepted record to capture one entry every 10 read entries. Because one file may not contain the 10,000 records alone, multiple files will be considered by this policy. Finally, as we are using 4 threads for parsing, the sampling can span over at least 4 files.
new ParquetParserBuilder(datastore)
.withNumberOfThreads(4)
.withPolicy(IParquetReaderPolicy.filePolicy(3))
.build()
.parse("path/to/the/directory",
mapping);
In the snippet above, we only load 3 files from the source directory. As the listing of the directory content is an undefined process, there are no guarantee that the same files are loaded at each invocation, nor it is possible to anticipate which files are loaded.
Note that the sampling policy is defined in the constructor of the ParquetParser
. Thus, the policy is applied across consecutive parsings of files. It is entirely possible to implement a custom resettable policy and safely call the reset method between parsing. This is not a feature that the available policy provides, as it is equivalently easy to instantiate a new Parser if needed.
Transforming Parquet content before Datastore insertion
Though it is highly recommended avoiding ETL transformations into ActivePivot to improve the loading time, the Parquet Source has basic transformation capabilities in the form of Column Calculators.
Basic calculators accept standard Java Function
to transform the Parquet values. For the best of performance, these transformations also exist on primitives directly, to avoid unnecessary boxing.
IStoreToParquetMapping mapping = new StoreToParquetMappingBuilder()
.onStore("store")
.feedStoreField("height")
.withColumnCalculator()
.fromSingleParquetField("height")
.mapDouble(v -> v / 100) // Scale values from centimeters to meters
.build();
As it is often necessary to read several fields to produce a single value for the store, calculators can also be built on top of the Parquet Schema.
IStoreToParquetMapping mapping = new StoreToParquetMappingBuilder()
.onStore("store")
.feedStoreField("datastore_field3")
.withColumnCalculator()
.mapToInt(schema -> {
int index4 = IStoreToParquetMapping.getIndexOfFieldNamed("parquet_field_4", schema);
int index5 = IStoreToParquetMapping.getIndexOfFieldNamed("parquet_field_5", schema);
return (record, defaultValue) -> record.getInt(index4) + record.getInt(index5);
})
.build();
This also serves as a solution to transform Object values into primitives without boxing. However, this case is best covered using a custom parser, as detailed in this section.
To coexist with the CSV Source, it is possible to pass the same IColumnCalculator
as Calculators for this Source. This has the benefit of offering dependencies between columns, at the cost of boxing for primitive types.
In the snippet below, we reuse the computed value for datastore_field3 to compute datastore_field4.
IStoreToParquetMapping mapping = new StoreToParquetMappingBuilder()
.onStore("store")
.feedStoreField("datastore_field3")
.withLegacyColumnCalculator(new AColumnCalculator<IParquetRecord>("datastore_field3") {
@Override
public Object compute(IColumnCalculationContext<IParquetRecord> context) {
final int parquetValue = ((Number) context.getContext().get("parquet_field_2")).intValue();
return parquetValue * 2;
}
})
.feedStoreField("datastore_field4")
.withLegacyColumnCalculator(new AColumnCalculator<IParquetRecord>("datastore_field4") {
@Override
public Object compute(IColumnCalculationContext<IParquetRecord> context) {
int nestedValue = ((Number) context.getValue("datastore_field3")).intValue();
return nestedValue * 3;
}
})
.build();
Finally, it is still possible to use Insertion-Time triggers on the store, but those triggers are applied whatever the origin of the records.
Use custom parsers
It is often suited to define one's own parser to convert raw Parquet data into the right target format. Though this is similar to using calculators to transform read data - such as converting a Parquet timestamp into a local date - it is appropriate to avoid intermediate deserialized forms. This is particularly the case when working with vectors of numbers, often stored as byte-arrays and read as primitive arrays.
new ParquetParserBuilder(datastore).build().parse(
new Path("path/to/file"),
field -> {
if (field.equals("vector")) {
return (parent, fieldSchema) -> new PrimitiveConverter() {
@Override
public void addBinary(Binary value) {
int length = value.length() / Double.BYTES;
float[] floatArray = new float[length];
double[] doubleArray = new double[10];
for (int i = 0; i < length; i += 10) {
value.toByteBuffer().asDoubleBuffer()
.get(doubleArray, i, Math.min(length - i, 10));
for (int k = 0; k < 10; k++) {
floatArray[i + k] = (float) doubleArray[k];
}
}
parent.add(floatArray);
}
};
} else {
return null;
}
},
new StoreToParquetMappingBuilder().onStore("store").build());
In this example, we use a special parser to read and adapt a custom vector of double values stored in the Parquet field vector. We use a buffer of 10 values to read a small part of double values from a byte-array and truncate them to floats. Using a buffer of 10 values avoids allocating a huge byte array or the equivalent double array of the huge vectors.
Note that the Parquet Source already automatically applies its provided parsers for vectors of numbers.
Loading from Hadoop
The Parquet Source supports loading files and directories from any Hadoop cluster. This is an additional parameter of the #parse(..)
method of the Parser.
Configuration hadoopConfiguration = new Configuration();
hadoopConfiguration.addResource("config/folder/hadoop.xml");
new ParquetParserBuilder(datastore)
.withNumberOfThreads(8)
.build()
.parse(
new Path("hdfs://hadoop-node:port/path/to/parquet/directory"),
hadoopConfiguration,
null, // No extra parsers
IStoreToParquetMapping.builder().onStore("store").build());
As seen in this sample, we provided a special configuration for the cluster, and loaded an entire directory from the cluster. As for the loading of any directory, the Parser automatically ignores .SUCCESS files.
It is worth noting that the Hadoop cluster configuration is provided at loading time. Though unlikely, this makes it possible to reuse the same instance of Parser to load data from multiple clusters or to load data using different configurations.
Loading from a Cloud Provider
As we can load files from a Hadoop cluster, it is possible to load Parquet files for the Cloud Providers supported by ActivePivot.
While it is possible to use a custom Hadoop configuration to target a Cloud Hadoop cluster, ActivePivot Cloud reading is designed to offer a very-high download speed and is more suited to fill the Datastore and start applications as fast as possible.
CloudFetchingConfig config = new CloudFetchingConfig(3);
ExecutorService executorService = Executors.newCachedThreadPool(
new NamedThreadFactory("parquet-download"));
datastore.edit(tm -> {
new ParquetParserBuilder(datastore)
.withNumberOfThreads(4)
.build()
.parse(
new AzureCloudDirectory(
client,
"azure-container",
"trade-dir"),
(path) -> new AzureBlobChannel(path, config, executorService),
null, // no extra parser needed
parquetMapping);
tm.forceCommit();
});
In the example above, we load data from an Azure directory. The noticeable change is that we are now passing a AzureCloudDirectory
- a special implementation of ICloudDirectory
for Azure - instead of a String or a Hadoop Path
.
In addition, to use ActivePivot cloud readers, we must provide a factory method to create a reader for every file of the Cloud directory. In the example, we configure the readers to use 3 dedicated threads per file for downloading purposes, while processing 4 files in parallel, as per the Parser configuration.
Obviously, it is possible to download a single Parquet file from the Cloud passing a ICloudEntityPath
instead of a ICloudDirectory
.
Note that when reading from Cloud resources, .crc files are ignored.
Filtering read files when traversing a directory
When creating an IParquetParser
, an optional java.util.regex.Pattern
can be passed to the builder to control which files the parser should consider when traversing directories.
Only the files that match the pattern will be considered for parsing. The full path string needs to match against the pattern for a file to be parsed. The pattern is taken into account for local files, Hadoop file storage and cloud providers.
The Pattern
can be supplied to the parser as follows to the builder:
final Pattern endsWithNumber = Pattern.compile(".*[0-9]\\.parquet");
final IParquetParser parser = new ParquetParserBuilder(datastore)
.withPattern(endsWithNumber)
.build();
Once configured, the parser will subsequently filter all parsed directories.
For example, considering the following directory structure:
directory
|- some_subdirectory
| `- match1.parquet
|- match2.parquet
|- no_match.parquet
Calling parser.parse("directory", ...)
with the parser and pattern defined above will only
consider the following files:
Matched:
directory/some_subdirectory/match1.parquet
directory/match2.parquet
Ignored:
directory/no_match.parquet
Loading compressed data
The Parquet Source supports all the compression formats that parquet-mr does. This includes Snappy, LZ4, BZIP2, GZIP. You can find the complete list in Parquet Javadoc.
The files to read must be regular Parquet files, produced by systems known for their support of the Parquet format, like Spark, pandas, etc. The Parquet Source will automatically detect the compression format and read the files appropriately.
Configuration and performance considerations
The ParquetParserBuilder
used to generate a Parquet parser offers several configuration settings that can impact performance.
Number of Threads
builder.withNumberOfThreads()
affects the number of threads that will be used to process the parsing tasks, which are generated on a per-file basis.
builder.withExecutorService()
can be used to provide a custom ExecutorService
to
handle the dispatching of parsing tasks. This setting is incompatible with
withNumberOfThreads()
and will supersede whatever might have been set.
Batched or non-batched parser implementations
Two implementations are available for the Parquet file parser: a batched implementation and a non-batched implementation, the batched implementation being the default one.
The batched parser accesses memory in a more linear pattern, resulting in better overall performance when compared to the non-batched implementation, especially for larger files with many columns.
Instead of accessing each record one by one, it retrieves whole batches of data from the file, column by column, which heavily profits from the columnar structure of the Parquet format.
The internal batch size of the parser can be configured using builder.withBatchSize()
,
which corresponds to the number of records of the Parquet file being retrieved within each batch.
It should be chosen as large as memory constraints allow, ideally matching the maximum column chunk size within the files that are going to be read. Increases to batch sizes past this point do not provide performance benefits while increasing memory overhead of the parser.
Depending on the performance overhead the decoding of the file's data incurs, the batched configuration can prove to be 1.3 to 2 times faster than the non-batched one, which is why it is the recommended configuration.
The batched implementation is known to have the following limitations:
- no support for complex types other than vectors
- supported data encodings
are
PLAIN
,PLAIN_DICTIONARY
andRLE_DICTIONARY
If needed, these limitations can be bypassed by using the non-batched implementation.
It can be accessed by configuring the parser builder using builder.nonBatched()
.
This implementation will circumvent the above limitations and should be compatible with a larger number of parquet files, at the cost of some performance.
Monitoring Parquet Source
The Parquet Source contributes to the Health Event Monitoring, under the tags parquet and source.
The following snippet adds the basic implementation of the listener to the handler stack.
HealthEventDispatcher.INSTANCE.addEventHandler(
new LoggingParquetHealthHandler());
This uses the default logger to report all Parquet operations. By default, there is no filtering on the received events.