Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.activeviam.com/llms.txt

Use this file to discover all available pages before exploring further.

QueryCube.unload_members_from_data_cube(members, *, data_cube_id, level, scenario_name=‘Base’)

Unload the given members of a level from a data cube. This is mostly used for data rollover.
This requires the query cube to have been created with allow_data_duplication set to True and with non empty distributing_levels.
  • Parameters:
    • members (Set [bool | int | float | date | datetime | time | str ]) – The members to unload.
    • data_cube_id (str) – The ID of the data cube from which to unload the members. This must be equal to the id_in_cluster argument passed to create_cube().
    • level (HasIdentifier *[*LevelIdentifier ] | LevelIdentifier | tuple [str , str , str ]) – The level containing the members to unload.
    • scenario_name (str) – The name of the scenario from which facts must unloaded.
  • Return type: None

Example

>>> from pathlib import Path
>>> from secrets import token_urlsafe
>>> from tempfile import mkdtemp
>>> from time import sleep
>>> import pandas as pd
>>> from atoti_jdbc import JdbcPingDiscoveryProtocol
>>> application_name = "Cities"
>>> cluster_name = "Cluster"
>>> data_cube_id = "Europe"
>>> query_cube_name = "Query cube"
Setting up the query cube:
>>> query_session = tt.QuerySession.start()
>>> cluster_definition = tt.ClusterDefinition(
...     application_names={application_name},
...     discovery_protocol=JdbcPingDiscoveryProtocol(
...         f"jdbc:h2:{Path(mkdtemp('atoti-cluster')).as_posix()}/db",
...         username="sa",
...         password="",
...     ),
...     authentication_token=token_urlsafe(),
... )
>>> query_session.session.clusters[cluster_name] = cluster_definition
>>> query_session.query_cubes[query_cube_name] = tt.QueryCubeDefinition(
...     query_session.session.clusters[cluster_name],
...     allow_data_duplication=True,
...     distributing_levels={(application_name, "City", "City")},
... )
Defining some functions:
>>> def query_by_city():
...     cube = query_session.session.cubes[query_cube_name]
...     l, m = cube.levels, cube.measures
...     return cube.query(m["Number.SUM"], levels=[l["City"]])
>>> def wait_for_data(*, expected_city_count: int):
...     max_attempts = 30
...     for _ in range(max_attempts):
...         try:
...             result = query_by_city()
...             if len(result.index) == expected_city_count:
...                 return
...         except:
...             pass
...         sleep(1)
...     raise RuntimeError(f"Failed {max_attempts} attempts.")
Setting up the data cube:
>>> data_session = tt.Session.start()
>>> data = pd.DataFrame(
...     columns=["City", "Number"],
...     data=[
...         ("Paris", 20.0),
...         ("London", 5.0),
...         ("Madrid", 7.0),
...     ],
... )
>>> table = data_session.read_pandas(
...     data, keys={"City"}, table_name=application_name
... )
>>> data_cube = data_session.create_cube(table, id_in_cluster=data_cube_id)
Waiting for the data cube to join the cluster:
>>> data_session.clusters[cluster_name] = cluster_definition
>>> wait_for_data(expected_city_count=3)
>>> query_by_city()
       Number.SUM
City
London       5.00
Madrid       7.00
Paris       20.00
Unloading the facts associated with the London and Madrid members:
>>> query_session.query_cubes[
...     query_cube_name
... ].unload_members_from_data_cube(
...     {"London", "Madrid"},
...     data_cube_id=data_cube_id,
...     level=data_cube.levels["City"],
... )
>>> wait_for_data(expected_city_count=1)
>>> query_by_city()
      Number.SUM
City
Paris      20.00