Simple DLC example
In this tutorial we will be creating a simple Data Load Controller to load data from a Pandas dataframe through RESTful services without a scope.
Importing py_dlc
We can import the library in our python project as follows:
import py_dlc
We will also need the following imports:
Import | Description |
---|---|
atoti |
The actual Atoti import is needed. |
from atoti import Session |
The Session is used as an input type in the DLC’s callback operations. This allows our Operation method signature to be clean and consistent. |
from py_dlc import Scope |
The Scope is used to narrow down a DLC’s operation. This class is needed as our Operation method is typed. |
from typing import Optional |
The Optional is used within our Scope as Scopes can be “optionally” empty. This class is needed as our Operation method is typed. |
Other imports not needed for the DLC, but only for this example:
import pandas as pd
import requests
Session Initialization and Data Gathering
First we will create a new session and initialize a TradeTable
Table with two columns.
session = tt.create_session()
session.create_table(
types={
"Trade ID": tt.type.STRING,
"Quantity": tt.type.FLOAT,
},
name="TradeTable",
keys=["Trade ID"],
)
Data Creation
Now we can create some data to be loaded with a Pandas DataFrame.
trade_data_dict = {
"Trade ID": ["Trade_1", "Trade_2"],
"Quantity": [1, 2],
}
trades_df = pd.DataFrame(data=trade_data_dict)
Loading Data
Normally we would load our Pandas DataFrame directly into our Cube as follows:
session.tables["TradeTable"].load_pandas(trades_df)
We will continue to use this same logic but instead now we will execute this bit of code within a DataLoadController operation. First we will need to define our DataLoadController and register our Operations.
# Create an instance of the DataLoadController for the current session.
dlc = py_dlc.DataLoadController(session)
Callback Method Definition
Now we can register different operations to execute. Here we will register a Loading operation on our topic “Trades
”.
We can define the Callback method in one of two ways as can be seen below:
Callback Method Parameter Operation Definition
We can define the Callback Method through a Decorator inline:
# Define Callback inline with the operation definition:
@dlc.operation(topic="Trades", operation_type="LOAD")
def load_trades(session: Session, scope: Optional[Scope]):
session.tables["TradeTable"].load_pandas(trades_df)
Callback Decorator Operation Definition
We can define the Callback Method as a regular method, then later pass it to the operation definition.
# Define callback:
def load_trades(session: Session, scope: Optional[Scope]):
session.tables["TradeTable"].load_pandas(trades_df)
# Elsewhere use this callback in the operation definition:
dlc.operation(topic="Trades", operation_type="LOAD")(load_trades)
As we can see the Callback definition is very similar for both designs. We just need to ensure that our Callback method
contains the Parameter Objects Session
and Optional[Scope]
. We can handle the scope however we like, in this
example we do not use it.
RESTful Execution
We can initiate the loading of our Trades
data by executing a request against the endpoint
(The PORT
can be resolved by reading the session.port
variable):
http://localhost:<PORT>/atoti/pyapi/load-controller/execute
We will execute a POST
request with the following JSON body:
{
"operation": "LOAD",
"topics": ["Trades"]
}
Or we can execute the request within our Python project with the following:
# Execute a LOAD operation on one of our Topics
json_payload ={
"operation": "LOAD",
"topics": ["Trades"]
}
load_response = requests.post(dlc.operation_url, json=json_payload)
This will give us the following response:
note
The source is noted as “PARQUET” this is because Atoti converts Pandas DataFrames into Parquet during loading.
{
"Task Name": "005cfb9caa",
"Time Taken (MS)": 2569,
"Status": "SUCCESS",
"Events": [
{
"Data Root": "file:/C:/Users/YourName/AppData/Local/Temp/atoti-zj1dg1gy/tmpwhbybivj.parquet",
"Source": "PARQUET",
"Lines Loaded": 2,
"Duration (MS)": 452,
"Errors": 0
}
]
}