Avro
Avro Source Configuration
Source to load files from an Avro source. We do not specify the location of the files nor do we assume the structure of the data.
Maven Dependency
We first must ensure we are importing the correct dependency. Your pom should have:
<!-- For Avro Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-avro</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
Source Configuration
The Avro Source AvroFilesScopedFetchSource
implementation is fairly generic and makes no assumption about the structure of the data to load, except the fact that we expect to load Avro files. We use Avro ‘Generic’ data reading components, which are configured via the relevant ‘Avro Schema’ files (for each AvroFilesScopedFetchTopic
).
We must note that when registering the Avro Channels, we must use a TupleMessageChannelFactory
to create the channels for each of our topics. Because we are using the TupleMessageChannelFactory, we need to create a mapping of column names ot their index - an example of this is done in the avroSource()
method in the example below.
Below is an example:
@Value("${input.data.root.dir.path:data-samples}")
private String inputDataRootDirPath;
@Autowired
private IDatastoreConfig datastoreConfig;
private final PathMatcher INPUT_FILE_PATH_MATCHER_PRODUCT = FileSystems.getDefault().getPathMatcher("glob:**/*products_data*.avro");
@Bean(destroyMethod = "close")
public IDataLoadController dataLoadController() {
final IDataLoadController controller = new DataLoadController(datastoreConfig.datastore(), null);
final TupleMessageChannelFactory channelFactory = avroMessageChannelFactory();
final ITuplePublisher<String> publisher = yourTuplePublisher();
controller.registerSource(avroSource());
controller.registerChannel(
channelFactory.createChannel(
"Avro_Products_Topic",
DataStoreNames.PRODUCT_STORE_NAME,
publisher),
publisher.getTargetStores(),
new ProductScopeToRemoveWhereConditionConverter());
return controller;
}
/**
* The source that is handling any AVRO data loading for this project.
*/
@Bean(destroyMethod = "close")
public AvroFilesScopedFetchSource avroSource() {
final Properties sourceProps = new Properties();
final AvroFilesScopedFetchSource source = new AvroFilesScopedFetchSource(sourceProps);
final String schemaFilePath = inputDataRootDirPath + "/avro/schema.avsc";
// This creates an index mapping of column names to their index; this is used by the TupleMessageChannelFactory
// for generating default column calculators.
final LinkedHashMap<Integer, String> sourceStoreMapping = new LinkedHashMap<>();
sourceStoreMapping.put(0, DataStoreNames.PRODUCT_ID);
sourceStoreMapping.put(1, DataStoreNames.PRODUCT_NAME);
sourceStoreMapping.put(2, DataStoreNames.PRODUCT_COLOR);
sourceStoreMapping.put(3, DataStoreNames.PRODUCT_PRICE);
sourceStoreMapping.put(4, DataStoreNames.PRODUCT_COB_DATE);
sourceProps.putAll(sourceStoreMapping);
source.addTopic(
new AvroFilesScopedFetchTopic(
AVRO_TOPIC__PRODUCT,
inputDataRootDirPath,
INPUT_FILE_PATH_MATCHER_PRODUCT,
new FilesScopedFetchTopic.FetchScopeToFileScanParametersConverter(),
schemaFilePath,
AvroFileTask.PLUGIN_KEY,
sourceProps));
return source;
}
@Bean
public TupleMessageChannelFactory avroMessageChannelFactory() {
final TupleMessageChannelFactory factory = new TupleMessageChannelFactory(
avroSource(), this.datastoreConfig.datastore());
// note - here we add the definition of calculated fields (if/as required) for particular channels,
// which means field calculation logic executed as data transit through channels...
//
// Some other fields may be calculated via update-where procedures running automatically upon DS
// transaction commit: refer to DatastoreEtlConfig for more details about that.
return factory;
}
Schema File
The Schema file we use contains the following JSON definition:
{
"type": "record",
"name": "Products",
"fields": [
{
"name": "ProductId",
"type": "string"
},
{
"name": "Name",
"type": "string"
},
{
"name": "Color",
"type": "string"
},
{
"name": "Price",
"type": "double"
},
{
"name": "CobDate",
"type": "string",
"logicalType": "date"
}
]
}
Topic Configuration
An AvroFilesScopedFetchSource
consists of one or many Topics of type AvroFilesScopedFetchTopic
. The Topic name must be globally unique (to all other Topics). The AvroFilesScopedFetchTopic
’s consist of the following parameters:
Topic Parameter | Description |
---|---|
name | Name of this topic. Must be globally unique among all Topics |
rootDirectoryPath | Base directory to apply pathMatcher to |
pathMatcher | Implementation of PathMatcher interface that performs matching operations on paths |
fetchScopeToFileScanParametersConverter | Function to use Scope keys to an IFileScanParameters to scan files to load |
schemaFilePath | Relative or Absolute path to Avro schema file |
fileTaskPluginKey | Plugin Key of IFileTask to handle processing of Avro records. |
extraProperties | Must Contain a mapping of column indexs to column names. Example in avroSource() method above |
Java Topic properties:
public AvroFilesScopedFetchTopic(
final String name,
final String rootDirectoryPath,
final PathMatcher pathMatcher,
final IScopeToFileScanParametersConverter fetchScopeToFileScanParametersConverter,
final String schemaFilePath,
final String fileTaskPluginKey,
final Properties extraProperties)
Avro File Task
An implementation of AAvroFileTask
must be defined. A generic implementation simply appends the chunk with the parquet record as follows:
@QuartetExtendedPluginValue(intf = IFileTask.class, key = AAvroFileTask.PLUGIN_KEY)
public class MyAvroFileTask extends AAvroFileTask {
public static final String PLUGIN_KEY = "My_Avro_File_Task";
/**
* Create an Avro file task.
*
* @param filePath Path of the Avro file to process.
* @param topic Topic which the Avro file belongs to.
* @param channel Current IMessageChannel
*/
public MyAvroFileTask(Path filePath, IAvroFilesScopedFetchTopic topic, IMessageChannel channel) {
super(filePath, topic, channel);
}
@Override
public String getType() {
return PLUGIN_KEY;
}
@Override
protected int doAppendToChunk(IMessageChunk chunk, Object[] avroRecord) {
chunk.append(avroRecord);
return 1;
}
}