DLC scoped example

In this tutorial we will be creating a simple Data Load Controller to load a CSV file from the Local filesystem through RESTful services.

Required Depencies

Atoti+ minimum version 0.6.3.

Importing py_dlc

We can import the library in our python project as follows:

import py_dlc

We will also need the following imports:

Import Description
import atoti as tt The actual Atoti import is needed. We rename it to tt for simplicity.
from atoti import Session The Session is used as an input type in the DLC’s callback operations. This allows our Operation method signature to be clean and consistent.
from py_dlc import Scope The Scope is used to narrow down a DLC’s operation. This class is needed as our Operation method is typed.
from typing import Optional The Optional is used within our Scope as Scopes can be “optionally” empty. This class is needed as our Operation method is typed.

Other imports not needed for the DLC, but only for this example:

import pandas as pd
from pathlib import Path
import os
import requests

Session Initialization

First we will create a new session and initialize a TradeTable Table with two columns.

session = tt.create_session()
session.create_table(
    types={
        "Trade ID": tt.type.STRING,
        "Quantity": tt.type.FLOAT,
    },
    name="TradeTable",
    keys=["Trade ID"],
)

Data

Let’s assume we have the following file structure organized by month:

/data
  -> /month
      -> /january
          -> trades_january_1.csv
          -> trades_january_2.csv
      -> /february
          -> trades_february.csv
      -> /march
          -> trades_march.csv

And our input data for the trades_february.csv file contains:

Trade ID,Quantity
Trade_1,3.0
Trade_2,1.0

We can create a Path of the month directory that we will use to load our CSV files:

SALES_DATA_BY_MONTH_DIRECTORY = Path(os.getcwd()) / "data" / "month"

Loading Data

Lets define a callback method to load the data from the month directory specified in the DLC Scope.

def scoped_trades_load(session: Session, scope: Optional[Scope]):
    directory = SALES_DATA_BY_MONTH_DIRECTORY / scope["month"]
    glob = f"{directory.resolve()}/trades_*.csv"
    session.tables["TradeTable"].load_csv(glob)

We can now create our DLC and register our LOAD operation.

# Create an instance of the DataLoadController for the current session.
dlc = py_dlc.DataLoadController(session)

# Register LOAD operation
dlc.operation(topic="Trades", operation_type="LOAD")(scoped_trades_load)

RESTful Execution

We can initiate the loading of our Trades data by executing a request against the endpoint (The PORT can be resolved by reading the session.port variable):

http://localhost:<PORT>/atoti/pyapi/load-controller/execute

We will execute a POST request with the following JSON body:

{
    "operation": "LOAD",
    "topics": [
        "Trades"
    ],
    "scope": {
        "month": "february"
    }
}

Or we can execute the request within our Python project with the following:

# Execute a LOAD operation on one of our Topics
json_payload ={
    "operation": "LOAD",
    "topics": [
        "Trades"
    ],
    "scope": {
        "month": "february"
    }
}

load_response = requests.post(dlc.operation_url, json=json_payload)

The following response shows us that only the Trades within the ‘february’ directory were loaded:

{
    "Task Name": "d33914b658",
    "Time Taken (MS)": 462,
    "Status": "SUCCESS",
    "Events": [
        {
            "Data Root": "C:\\data\\sales\\month\\january\\trades_february.csv",
            "Source": "CSV",
            "Lines Loaded": 2,
            "Duration (MS)": 120,
            "Errors": 0
        }
    ]
}