Parquet

Parquet Source Configuration

Source to load files from a Parquet source.

Maven Dependency

We first must ensure we are importing the correct dependency. Your pom should have:

    <!-- For Parquet Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>data-connectors-parquet</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

Source Configuration

We build an instance of ParquetSource to load Parquet Columnar data.

Below is an example loading a Parquet file with two columns Parquet_TradeID_Column and Parquet_TradeID_Column:

@Autowired
private IDatastoreConfig datastoreConfig;

/**
 * The source that is handling any Parquet data loading for this project. Note: The Parquet File to Datastore mapping must be to the store used for test,
 * the Base store. See the sourceStoreMapping and the store name passed into the topic.
 */
@Bean
public ParquetSource<?> parquetSource() {

    // Regex file path matcher
    final PathMatcher filePathMatcher = FileSystems.getDefault().getPathMatcher("glob:**/example_file.parquet");
    
    // -- initial load
    final ParquetSource<?> parquetSource = new ParquetSource<>("My_Parquet_Source", this.datastoreConfig.datastore());

    // Source Columns to Store Fields mapping
    final HashMap<String, String> sourceStoreMapping = new HashMap<>();
    sourceStoreMapping.put("Parquet_TradeID_Column", "TradeID");
    sourceStoreMapping.put("Parquet_ProductID_Column", "ProductID");

    // Topic Parameters
    String topicName = "TradeProduct_Parquet_Topic";
    final Path baseDir = Paths.get("./path/to/base/directory");
    
    
    final IParquetTopic datasetTopic = new ParquetTopic<>(
            topicName,
            baseDir.toString(),
            filePathMatcher,
            new FilesScopedFetchTopic.FetchScopeToFileScanParametersConverter(),
            MyParquetFileTask.PLUGIN_KEY,
            null, // extraProperties
            sourceStoreMapping, // source store mapping
            new ArrayList<>(sourceStoreMapping.keySet()), // source field
            baseDir, // base directory path
            "BaseStore");// target store name

    parquetSource.addTopic(datasetTopic);

    return parquetSource;
}

@Bean
public TupleMessageChannelFactory parquetMessageChannelFactory() {
    final TupleMessageChannelFactory factory = new TupleMessageChannelFactory(parquetSource(),
            datastoreConfig.datastore());
    return factory;
}

Topic Configuration

An ParquetSource consists of one or many Topics of type ParquetTopic. The Topic name must be globally unique (to all other Topics). The Parquet Topic defines a mapping of Parquet Column names to Datastore Columns. Each ParquetTopic is defined for a particular store. The ParquetTopic’s consist of the following parameters:

Topic Parameter Description
name Name of this topic. Must be globally unique among all Topics
rootDirectoryPath Base directory to apply pathMatcher to
pathMatcher Implementation of PathMatcher interface that performs matching operations on paths
fetchScopeToFileScanParametersConverter Function to use Scope keys to an IFileScanParameters to scan files to load. Can use the default FilesScopedFetchTopic.FetchScopeToFileScanParametersConverter class
fileTaskPluginKey Plugin Key of an extension of the abstract AParquetFileTask. Must implement doAppendToChunk(). Example below
extraProperties Can be null
sourceStoreMapping Mapping of Parquet Column names to Atoti Server Store Column names
sourceField List of all column headers in Parquet File
path The base directory Path object
storeName Name of target Store

Java Topic properties:

    public ParquetTopic(
        final String name,
        final String rootDirectoryPath,
        final PathMatcher pathMatcher,
        final IScopeToFileScanParametersConverter fetchScopeToFileScanParametersConverter,
        final String fileTaskPluginKey,
        final Properties extraProperties,
        HashMap<String, String> sourceStoreMapping,
        List<I> sourceField,
        I path,
        String storeName)

Parquet File Task

An implementation of AParquetFileTask must be defined. A generic implementation simply appends the chunk with the parquet record as follows:

@QuartetExtendedPluginValue(intf = IFileTask.class, key = MyParquetFileTask.PLUGIN_KEY)
public class MyParquetFileTask extends AParquetFileTask {
public static final String PLUGIN_KEY = "My_Parquet_File_Task";

    /**
     * Create a Parquet file task.
     *
     * @param filePath Path of the Parquet file to process.
     * @param topic    Topic which the Parquet file belongs to.
     * @param channel  Current IMessageChannel
     */
    public MyParquetFileTask(Path filePath, IParquetTopic topic, IMessageChannel channel) {
        super(filePath, topic, channel);
    }

    @Override
    public String getType() {
        return PLUGIN_KEY;
    }

    @Override
    protected int doAppendToChunk(IMessageChunk chunk, Object[] parquetRecord) {
        chunk.append(parquetRecord);
        return 1;
    }
}