DLC scoped example
In this tutorial we will be creating a simple Data Load Controller to load a CSV file from the Local filesystem through RESTful services.
Required Dependencies
Atoti minimum version 0.6.3.
Importing py_dlc
We can import the library in our python project as follows:
import py_dlc
We will also need the following imports:
Import | Description |
---|---|
import atoti as tt |
The actual Atoti import is needed. We rename it to tt for simplicity. |
from atoti import Session |
The Session is used as an input type in the DLC’s callback operations. This allows our Operation method signature to be clean and consistent. |
from py_dlc import Scope |
The Scope is used to narrow down a DLC’s operation. This class is needed as our Operation method is typed. |
from typing import Optional |
The Optional is used within our Scope as Scopes can be “optionally” empty. This class is needed as our Operation method is typed. |
Other imports not needed for the DLC, but only for this example:
import pandas as pd
from pathlib import Path
import os
import requests
Session Initialization
First we will create a new session and initialize a TradeTable
Table with two columns.
session = tt.Session()
session.create_table(
types={
"Trade ID": tt.type.STRING,
"Quantity": tt.type.FLOAT,
},
name="TradeTable",
keys=["Trade ID"],
)
Data
Let’s assume we have the following file structure organized by month:
/data
-> /month
-> /january
-> trades_january_1.csv
-> trades_january_2.csv
-> /february
-> trades_february.csv
-> /march
-> trades_march.csv
And our input data for the trades_february.csv
file contains:
Trade ID,Quantity
Trade_1,3.0
Trade_2,1.0
We can create a Path of the month directory that we will use to load our CSV files:
SALES_DATA_BY_MONTH_DIRECTORY = Path(os.getcwd()) / "data" / "month"
Loading Data
Lets define a callback method to load the data from the month directory specified in the DLC Scope.
def scoped_trades_load(session: Session, scope: Optional[Scope]):
directory = SALES_DATA_BY_MONTH_DIRECTORY / scope["month"]
glob = f"{directory.resolve()}/trades_*.csv"
session.tables["TradeTable"].load_csv(glob)
We can now create our DLC and register our LOAD
operation.
# Create an instance of the DataLoadController for the current session.
dlc = py_dlc.DataLoadController(session)
# Register LOAD operation
dlc.operation(topic="Trades", operation_type="LOAD")(scoped_trades_load)
RESTful Execution
We can initiate the loading of our Trades
data by executing a request against the endpoint
(The PORT
can be resolved by reading the session.port
variable):
http://localhost:<PORT>/atoti/pyapi/load-controller/execute
We will execute a POST
request with the following JSON body:
{
"operation": "LOAD",
"topics": [
"Trades"
],
"scope": {
"month": "february"
}
}
Or we can execute the request within our Python project with the following:
# Execute a LOAD operation on one of our Topics
json_payload ={
"operation": "LOAD",
"topics": [
"Trades"
],
"scope": {
"month": "february"
}
}
load_response = requests.post(dlc.operation_url, json=json_payload)
The following response shows us that only the Trades within the ‘february’ directory were loaded:
{
"Task Name": "d33914b658",
"Time Taken (MS)": 462,
"Status": "SUCCESS",
"Events": [
{
"Data Root": "C:\\data\\sales\\month\\january\\trades_february.csv",
"Source": "CSV",
"Lines Loaded": 2,
"Duration (MS)": 120,
"Errors": 0
}
]
}