Parquet
Parquet Source Configuration
Source to load files from a Parquet source.
Maven Dependency
We first must ensure we are importing the correct dependency. Your pom should have:
<!-- For Parquet Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-parquet</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
Source Configuration
We build an instance of ParquetSource
to load Parquet Columnar data.
Below is an example loading a Parquet file with two columns Parquet_TradeID_Column and Parquet_TradeID_Column:
@Autowired
private IDatastoreConfig datastoreConfig;
/**
* The source that is handling any Parquet data loading for this project. Note: The Parquet File to Datastore mapping must be to the store used for test,
* the Base store. See the sourceStoreMapping and the store name passed into the topic.
*/
@Bean
public ParquetSource<?> parquetSource() {
// Regex file path matcher
final PathMatcher filePathMatcher = FileSystems.getDefault().getPathMatcher("glob:**/example_file.parquet");
// -- initial load
final ParquetSource<?> parquetSource = new ParquetSource<>("My_Parquet_Source", this.datastoreConfig.datastore());
// Source Columns to Store Fields mapping
final HashMap<String, String> sourceStoreMapping = new HashMap<>();
sourceStoreMapping.put("Parquet_TradeID_Column", "TradeID");
sourceStoreMapping.put("Parquet_ProductID_Column", "ProductID");
// Topic Parameters
String topicName = "TradeProduct_Parquet_Topic";
final Path baseDir = Paths.get("./path/to/base/directory");
final IParquetTopic datasetTopic = new ParquetTopic<>(
topicName,
baseDir.toString(),
filePathMatcher,
new FilesScopedFetchTopic.FetchScopeToFileScanParametersConverter(),
MyParquetFileTask.PLUGIN_KEY,
null, // extraProperties
sourceStoreMapping, // source store mapping
new ArrayList<>(sourceStoreMapping.keySet()), // source field
baseDir, // base directory path
"BaseStore");// target store name
parquetSource.addTopic(datasetTopic);
return parquetSource;
}
@Bean
public TupleMessageChannelFactory parquetMessageChannelFactory() {
final TupleMessageChannelFactory factory = new TupleMessageChannelFactory(parquetSource(),
datastoreConfig.datastore());
return factory;
}
Topic Configuration
An ParquetSource
consists of one or many Topics of type ParquetTopic
. The Topic name must be globally unique (to all other Topics). The Parquet Topic defines a mapping of Parquet Column names to Datastore Columns. Each ParquetTopic
is defined for a particular store. The ParquetTopic
’s consist of the following parameters:
Topic Parameter | Description |
---|---|
name | Name of this topic. Must be globally unique among all Topics |
rootDirectoryPath | Base directory to apply pathMatcher to |
pathMatcher | Implementation of PathMatcher interface that performs matching operations on paths |
fetchScopeToFileScanParametersConverter | Function to use Scope keys to an IFileScanParameters to scan files to load. Can use the default FilesScopedFetchTopic.FetchScopeToFileScanParametersConverter class |
fileTaskPluginKey | Plugin Key of an extension of the abstract AParquetFileTask . Must implement doAppendToChunk() . Example below |
extraProperties | Can be null |
sourceStoreMapping | Mapping of Parquet Column names to AP Store Column names |
sourceField | List of all column headers in Parquet File |
path | The base directory Path object |
storeName | Name of target Store |
Java Topic properties:
public ParquetTopic(
final String name,
final String rootDirectoryPath,
final PathMatcher pathMatcher,
final IScopeToFileScanParametersConverter fetchScopeToFileScanParametersConverter,
final String fileTaskPluginKey,
final Properties extraProperties,
HashMap<String, String> sourceStoreMapping,
List<I> sourceField,
I path,
String storeName)
Parquet File Task
An implementation of AParquetFileTask
must be defined. A generic implementation simply appends the chunk with the parquet record as follows:
@QuartetExtendedPluginValue(intf = IFileTask.class, key = MyParquetFileTask.PLUGIN_KEY)
public class MyParquetFileTask extends AParquetFileTask {
public static final String PLUGIN_KEY = "My_Parquet_File_Task";
/**
* Create a Parquet file task.
*
* @param filePath Path of the Parquet file to process.
* @param topic Topic which the Parquet file belongs to.
* @param channel Current IMessageChannel
*/
public MyParquetFileTask(Path filePath, IParquetTopic topic, IMessageChannel channel) {
super(filePath, topic, channel);
}
@Override
public String getType() {
return PLUGIN_KEY;
}
@Override
protected int doAppendToChunk(IMessageChunk chunk, Object[] parquetRecord) {
chunk.append(parquetRecord);
return 1;
}
}