Customize the CSV Source

This section shows you how to customize the ICSVSourceConfig bean by creating a CustomCsvSourceConfig class and implementing the ICSVSourceConfig, or extending a class that implements ICSVSourceConfig. For more information on source configurations, see the Atoti Server and Data Connectors documentation.

ICSVSourceConfig interface

The interface contains methods that create specific components necessary to parse and load data. These components include:

  • the Scoped Source: object to fetch or listen to changes in the file system.
  • the Message Channel Factory: factory object to register IStoreMessageChannel objects to the CSV Source.
  • the Directory Topic: object used to scan the file system for specific CSV files.

This interface also provides methods to create specific types of Scoped Source object, such as a Scoped Fetch Source and a Scoped Listen Source. Scoped Fetch Sources are good for loading a group of CSV files at once, whereas a Scoped Listen Source will load changes to CSV files in real time.

/**
 * Interface to indicate that implementations of this class are responsible for loading data of type
 * {@code I}.
 *
 * @param <I> <b>ICSVSourceConfig</b>
 *     <p>
 * @author ActiveViam
 */
public interface ICSVSourceConfig<I> {

    /**
     * Directory Topics are created and added to the Scoped Source object.
     *
     * @return - the Scoped Source object.
     */
    ICSVSource<I> bulkLoadCsvSource();

    /**
     * {@link com.qfs.msg.IColumnCalculator} objects are created to configure the type for all or a
     * subset of columns in the CSV file. 
     * An example Column Calculator: 
     * <p>
     *     `new CSVColumnParser(
     * LIMIT_SCOPE, new StringParser(), CSV_FIELDS__LIMITS.indexOf(LIMIT_SCOPE)));`
     *</p>
     * @param source - The Scoped Source object (IE: {@link ACsvScopedFetchSource} or {@link
     *     ACsvScopedListenSource}
     * @return - The {@link DlcCSVMessageChannelFactory} for Bulk Loading CSV files.
     */
    DlcCSVMessageChannelFactory<I> bulkLoadCsvMessageChannelFactory(ICSVSource<I> source);

    /**
     * @param sourceConfig {@link DlcCSVSourceConfiguration} object, where properties like:
     *     <li>- parserThreads: number of threads used for parsing incoming CSV files
     *     <li>- bufferSize: The CSV buffer size in KB
     *     <li>- fileComparator: Comparator object used to order the files
     * @return - the local Scoped Fetch Source object
     */
    ACsvScopedFetchSource<I> csvSource(DlcCSVSourceConfiguration<I> sourceConfig);

    /**
     * @param sourceConfig {@link DlcCSVSourceConfiguration} object, where properties like:
     *     <li>- parserThreads: number of threads used for parsing incoming CSV files
     *     <li>- bufferSize: The CSV buffer size in KB
     *     <li>- fileComparator: Comparator object used to order the files
     * @return - the local Scoped Listen Source object
     */
    ACsvScopedListenSource<I> csvListenSource(DlcCSVSourceConfiguration<I> sourceConfig);

    /**
     * @param source - the Scoped Source object that will be storing the new topic
     * @param topic - the name of the topic
     * @param parserConfig - {@link ICSVParserConfiguration} object, containing fields like CSV file
     *     separator, number of Columns, and a {@link com.qfs.msg.csv.policy.ICsvParserConfigPolicy}
     *     object. The {@code parserConfig} also contains boolean flags for processing quotes,
     *     accepting incomplete lines, and accepting overflowing lines.
     * @param fileScanParametersScopeKey - Scope key, used to determine how the topic should find the
     *     target CSV file in the file system. The two most common keys are {@code
     *     SCOPE_KEY__REL_DIR_PATH} and {@code SCOPE_KEY__RECURSIVE}.
     * @return An {@link ICSVTopic} object
     */
    ICSVTopic<I> createDirectoryTopic(
            final ICSVSource<I> source,
            final String topic,
            ICSVParserConfiguration parserConfig,
            String fileScanParametersScopeKey);
}

Example of a custom ICSVSourceConfig

1. Create the Spring Bean

Below is an example of a custom ICSVSourceConfig. The potential use case being highlighted is updating the ICSVParserConfiguration for existing topics.

@Primary
public class CustomCSVSourceConfig extends ACSVSourceConfig<Path> { // ACSVSourceConfig implements the ICSVSourceConfig interface.
    
    @Override
    public ICSVTopic<I> createDirectoryTopic(
            final ICSVSource<I> source,
            final String topic,
            ICSVParserConfiguration parserConfig,
            String fileScanParametersScopeKey) {
        ICSVTopic<Path> directoryTopic = super.createDirectoryTopic(source, topic, parserConfig, fileScanParametersScopeKey);
        directoryTopic.getParserConfiguration().setSeparator("|"); // The parser configuration has been overridden.
        return directoryTopic;
    }
}

note

Note the inclusion of the @Primary annotation, which tells Spring to use CustomCSVSourceConfig instead of CSVSourceConfig.

2. Import the Spring Bean

Once the bean is created, import it into the project. See Importing Spring Beans into the Project on how to do this. Once done, Spring will use the custom bean when configuring the CSVSourceConfig.