Documentation Index
Fetch the complete documentation index at: https://docs.activeviam.com/llms.txt
Use this file to discover all available pages before exploring further.
QueryCube.unload_members_from_data_cube(members, *, data_cube_id, level, scenario_name=‘Base’)
Unload the given members of a level from a data cube.
This is mostly used for data rollover.
- Parameters:
- members (Set [bool | int | float | date | datetime | time | str ]) – The members to unload.
- data_cube_id (str) – The ID of the data cube from which to unload the members.
This must be equal to the id_in_cluster argument passed to
create_cube().
- level (HasIdentifier *[*LevelIdentifier ] | LevelIdentifier | tuple [str , str , str ]) – The level containing the members to unload.
- scenario_name (str) – The name of the scenario from which facts must unloaded.
- Return type:
None
Example
>>> from pathlib import Path
>>> from secrets import token_urlsafe
>>> from tempfile import mkdtemp
>>> from time import sleep
>>> import pandas as pd
>>> from atoti_jdbc import JdbcPingDiscoveryProtocol
>>> application_name = "Cities"
>>> cluster_name = "Cluster"
>>> data_cube_id = "Europe"
>>> query_cube_name = "Query cube"
Setting up the query cube:
>>> query_session = tt.QuerySession.start()
>>> cluster_definition = tt.ClusterDefinition(
... application_names={application_name},
... discovery_protocol=JdbcPingDiscoveryProtocol(
... f"jdbc:h2:{Path(mkdtemp('atoti-cluster')).as_posix()}/db",
... username="sa",
... password="",
... ),
... authentication_token=token_urlsafe(),
... )
>>> query_session.session.clusters[cluster_name] = cluster_definition
>>> query_session.query_cubes[query_cube_name] = tt.QueryCubeDefinition(
... query_session.session.clusters[cluster_name],
... allow_data_duplication=True,
... distributing_levels={(application_name, "City", "City")},
... )
Defining some functions:
>>> def query_by_city():
... cube = query_session.session.cubes[query_cube_name]
... l, m = cube.levels, cube.measures
... return cube.query(m["Number.SUM"], levels=[l["City"]])
>>> def wait_for_data(*, expected_city_count: int):
... max_attempts = 30
... for _ in range(max_attempts):
... try:
... result = query_by_city()
... if len(result.index) == expected_city_count:
... return
... except:
... pass
... sleep(1)
... raise RuntimeError(f"Failed {max_attempts} attempts.")
Setting up the data cube:
>>> data_session = tt.Session.start()
>>> data = pd.DataFrame(
... columns=["City", "Number"],
... data=[
... ("Paris", 20.0),
... ("London", 5.0),
... ("Madrid", 7.0),
... ],
... )
>>> table = data_session.read_pandas(
... data, keys={"City"}, table_name=application_name
... )
>>> data_cube = data_session.create_cube(table, id_in_cluster=data_cube_id)
Waiting for the data cube to join the cluster:
>>> data_session.clusters[cluster_name] = cluster_definition
>>> wait_for_data(expected_city_count=3)
>>> query_by_city()
Number.SUM
City
London 5.00
Madrid 7.00
Paris 20.00
Unloading the facts associated with the London and Madrid members:
>>> query_session.query_cubes[
... query_cube_name
... ].unload_members_from_data_cube(
... {"London", "Madrid"},
... data_cube_id=data_cube_id,
... level=data_cube.levels["City"],
... )
>>> wait_for_data(expected_city_count=1)
>>> query_by_city()
Number.SUM
City
Paris 20.00