Avro

Avro Source Configuration

Source to load files from an Avro source. We do not specify the location of the files nor do we assume the structure of the data.

Maven Dependency

We first must ensure we are importing the correct dependency. Your pom should have:

    <!-- For Avro Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>data-connectors-avro</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

Source Configuration

The Avro Source AvroFilesScopedFetchSource implementation is fairly generic and makes no assumption about the structure of the data to load, except the fact that we expect to load Avro files. We use Avro ‘Generic’ data reading components, which are configured via the relevant ‘Avro Schema’ files (for each AvroFilesScopedFetchTopic).

We must note that when registering the Avro Channels, we must use a TupleMessageChannelFactory to create the channels for each of our topics. Because we are using the TupleMessageChannelFactory, we need to create a mapping of column names ot their index - an example of this is done in the avroSource() method in the example below.

Below is an example:

@Value("${input.data.root.dir.path:data-samples}")
private String inputDataRootDirPath;

@Autowired
private IDatastoreConfig datastoreConfig;

private final PathMatcher INPUT_FILE_PATH_MATCHER_PRODUCT = FileSystems.getDefault().getPathMatcher("glob:**/*products_data*.avro");

@Bean(destroyMethod = "close")
public IDataLoadController dataLoadController() {

	final IDataLoadController controller = new DataLoadController(datastoreConfig.datastore(), null);

	final TupleMessageChannelFactory channelFactory = avroMessageChannelFactory();

	final ITuplePublisher<String> publisher = yourTuplePublisher();
	
	controller.registerSource(avroSource());
	controller.registerChannel(
			channelFactory.createChannel(
					"Avro_Products_Topic",
					DataStoreNames.PRODUCT_STORE_NAME,
					publisher),
			publisher.getTargetStores(),
			new ProductScopeToRemoveWhereConditionConverter());

	return controller;
}

/**
 * The source that is handling any AVRO data loading for this project.
 */
@Bean(destroyMethod = "close")
public AvroFilesScopedFetchSource avroSource() {
	final Properties sourceProps = new Properties();
	final AvroFilesScopedFetchSource source = new AvroFilesScopedFetchSource(sourceProps);

	final String schemaFilePath = inputDataRootDirPath + "/avro/schema.avsc";

	// This creates an index mapping of column names to their index; this is used by the TupleMessageChannelFactory
	// for generating default column calculators.
	final LinkedHashMap<Integer, String> sourceStoreMapping = new LinkedHashMap<>();

	sourceStoreMapping.put(0, DataStoreNames.PRODUCT_ID);
	sourceStoreMapping.put(1, DataStoreNames.PRODUCT_NAME);
	sourceStoreMapping.put(2, DataStoreNames.PRODUCT_COLOR);
	sourceStoreMapping.put(3, DataStoreNames.PRODUCT_PRICE);
	sourceStoreMapping.put(4, DataStoreNames.PRODUCT_COB_DATE);

	sourceProps.putAll(sourceStoreMapping);

	source.addTopic(
			new AvroFilesScopedFetchTopic(
					AVRO_TOPIC__PRODUCT,
					inputDataRootDirPath,
					INPUT_FILE_PATH_MATCHER_PRODUCT,
					new FilesScopedFetchTopic.FetchScopeToFileScanParametersConverter(),
					schemaFilePath,
					AvroFileTask.PLUGIN_KEY,
					sourceProps));
	
	return source;
}

@Bean
public TupleMessageChannelFactory avroMessageChannelFactory() {

	final TupleMessageChannelFactory factory = new TupleMessageChannelFactory(
			avroSource(), this.datastoreConfig.datastore());

	// note - here we add the definition of calculated fields (if/as required) for particular channels,
	// which means field calculation logic executed as data transit through channels...
	//
	// Some other fields may be calculated via update-where procedures running automatically upon DS
	// transaction commit: refer to DatastoreEtlConfig for more details about that.

	return factory;
}

Schema File

The Schema file we use contains the following JSON definition:

{
    "type": "record",
    "name": "Products",
    "fields": [
        {
            "name": "ProductId",
            "type": "string"
        },
        {
            "name": "Name",
            "type": "string"
        },
        {
            "name": "Color",
            "type": "string"
        },
        {
            "name": "Price",
            "type": "double"
        },
        {
            "name": "CobDate",
            "type": "string",
            "logicalType": "date"
        }
    ]
}

Topic Configuration

An AvroFilesScopedFetchSource consists of one or many Topics of type AvroFilesScopedFetchTopic. The Topic name must be globally unique (to all other Topics). The AvroFilesScopedFetchTopic’s consist of the following parameters:

Topic Parameter Description
name Name of this topic. Must be globally unique among all Topics
rootDirectoryPath Base directory to apply pathMatcher to
pathMatcher Implementation of PathMatcher interface that performs matching operations on paths
fetchScopeToFileScanParametersConverter Function to use Scope keys to an IFileScanParameters to scan files to load
schemaFilePath Relative or Absolute path to Avro schema file
fileTaskPluginKey Plugin Key of IFileTask to handle processing of Avro records.
extraProperties Must Contain a mapping of column indexs to column names. Example in avroSource() method above

Java Topic properties:

public AvroFilesScopedFetchTopic(
    final String name,
    final String rootDirectoryPath,
    final PathMatcher pathMatcher,
    final IScopeToFileScanParametersConverter fetchScopeToFileScanParametersConverter,
    final String schemaFilePath,
    final String fileTaskPluginKey,
    final Properties extraProperties)

Avro File Task

An implementation of AAvroFileTask must be defined. A generic implementation simply appends the chunk with the parquet record as follows:

@QuartetExtendedPluginValue(intf = IFileTask.class, key = AAvroFileTask.PLUGIN_KEY)
public class MyAvroFileTask extends AAvroFileTask {
    public static final String PLUGIN_KEY = "My_Avro_File_Task";

    /**
     * Create an Avro file task.
     *
     * @param filePath Path of the Avro file to process.
     * @param topic    Topic which the Avro file belongs to.
     * @param channel  Current IMessageChannel
     */
    public MyAvroFileTask(Path filePath, IAvroFilesScopedFetchTopic topic, IMessageChannel channel) {
        super(filePath, topic, channel);
    }

    @Override
    public String getType() {
        return PLUGIN_KEY;
    }

    @Override
    protected int doAppendToChunk(IMessageChunk chunk, Object[] avroRecord) {
        chunk.append(avroRecord);
        return 1;
    }
}