Simple DLC example

In this tutorial we will be creating a simple Data Load Controller to load data from a Pandas dataframe through RESTful services without a scope.

Importing py_dlc

We can import the library in our python project as follows:

import py_dlc

We will also need the following imports:

Import Description
atoti The actual Atoti import is needed.
from atoti import Session The Session is used as an input type in the DLC’s callback operations. This allows our Operation method signature to be clean and consistent.
from py_dlc import Scope The Scope is used to narrow down a DLC’s operation. This class is needed as our Operation method is typed.
from typing import Optional The Optional is used within our Scope as Scopes can be “optionally” empty. This class is needed as our Operation method is typed.

Other imports not needed for the DLC, but only for this example:

import pandas as pd
import requests

Session Initialization and Data Gathering

First we will create a new session and initialize a TradeTable Table with two columns.

session = tt.Session()
session.create_table(
    types={
        "Trade ID": tt.type.STRING,
        "Quantity": tt.type.FLOAT,
    },
    name="TradeTable",
    keys=["Trade ID"],
)

Data Creation

Now we can create some data to be loaded with a Pandas DataFrame.

trade_data_dict = {
    "Trade ID": ["Trade_1", "Trade_2"],
    "Quantity": [1, 2],
}

trades_df = pd.DataFrame(data=trade_data_dict)

Loading Data

Normally we would load our Pandas DataFrame directly into our Cube as follows:

session.tables["TradeTable"].load_pandas(trades_df)

We will continue to use this same logic but instead now we will execute this bit of code within a DataLoadController operation. First we will need to define our DataLoadController and register our Operations.

# Create an instance of the DataLoadController for the current session.
dlc = py_dlc.DataLoadController(session)

Callback Method Definition

Now we can register different operations to execute. Here we will register a Loading operation on our topic “Trades”. We can define the Callback method in one of two ways as can be seen below:

Callback Method Parameter Operation Definition

We can define the Callback Method through a Decorator inline:

# Define Callback inline with the operation definition:
@dlc.operation(topic="Trades", operation_type="LOAD")
def load_trades(session: Session, scope: Optional[Scope]):
    session.tables["TradeTable"].load_pandas(trades_df)

Callback Decorator Operation Definition

We can define the Callback Method as a regular method, then later pass it to the operation definition.

# Define callback:
def load_trades(session: Session, scope: Optional[Scope]):
    session.tables["TradeTable"].load_pandas(trades_df)

# Elsewhere use this callback in the operation definition:
dlc.operation(topic="Trades", operation_type="LOAD")(load_trades)

As we can see the Callback definition is very similar for both designs. We just need to ensure that our Callback method contains the Parameter Objects Session and Optional[Scope]. We can handle the scope however we like, in this example we do not use it.

RESTful Execution

We can initiate the loading of our Trades data by executing a request against the endpoint (The PORT can be resolved by reading the session.port variable):

http://localhost:<PORT>/atoti/pyapi/load-controller/execute

We will execute a POST request with the following JSON body:

{
    "operation": "LOAD",
    "topics": ["Trades"]
}

Or we can execute the request within our Python project with the following:

# Execute a LOAD operation on one of our Topics
json_payload ={
    "operation": "LOAD",
    "topics": ["Trades"]
}

load_response = requests.post(dlc.operation_url, json=json_payload)

This will give us the following response:

note

The source is noted as “PARQUET” this is because Atoti converts Pandas DataFrames into Parquet during loading.

{
    "Task Name": "005cfb9caa",
    "Time Taken (MS)": 2569,
    "Status": "SUCCESS",
    "Events": [
        {
            "Data Root": "file:/C:/Users/YourName/AppData/Local/Temp/atoti-zj1dg1gy/tmpwhbybivj.parquet",
            "Source": "PARQUET",
            "Lines Loaded": 2,
            "Duration (MS)": 452,
            "Errors": 0
        }
    ]
}