atoti.QueryCube.unload_members_from_data_cube()#
- QueryCube.unload_members_from_data_cube(members, *, data_cube_id, level, scenario_name='Base')#
Unload the given members of a level from a data cube.
This is mostly used for data rollover.
Note
This requires the query cube to have been created with
allow_data_duplication
set toTrue
and with non emptydistributing_levels
.- Parameters:
members (Set[bool | int | float | date | datetime | time | str]) – The members to unload.
data_cube_id (str) – The ID of the data cube from which to unload the members. This must be equal to the id_in_cluster argument passed to
create_cube()
.level (HasIdentifier[LevelIdentifier] | LevelIdentifier | tuple[str, str, str]) – The level containing the members to unload.
scenario_name (str) – The name of the scenario from which facts must unloaded.
- Return type:
None
Example
>>> application_name = "Cities" >>> cluster_name = "Cluster" >>> data_cube_id = "Europe" >>> query_cube_name = "Query cube"
Setting up the
query cube
:>>> query_session = tt.QuerySession.start() >>> cluster_definition = tt.ClusterDefinition( ... application_names={application_name}, ... discovery_protocol=JdbcPingDiscoveryProtocol( ... f"jdbc:h2:{mkdtemp('atoti-cluster')}/db", ... username="sa", ... password="", ... ), ... authentication_token=token_urlsafe(), ... ) >>> query_session.session.clusters[cluster_name] = cluster_definition >>> query_session.query_cubes[query_cube_name] = tt.QueryCubeDefinition( ... query_session.session.clusters[cluster_name], ... allow_data_duplication=True, ... distributing_levels={(application_name, "City", "City")}, ... )
Defining some functions:
>>> def query_by_city(): ... cube = query_session.session.cubes[query_cube_name] ... l, m = cube.levels, cube.measures ... return cube.query(m["Number.SUM"], levels=[l["City"]])
>>> def wait_for_data(*, expected_city_count: int): ... max_attempts = 30 ... for _ in range(max_attempts): ... try: ... result = query_by_city() ... if len(result.index) == expected_city_count: ... return ... except: ... pass ... sleep(1) ... raise RuntimeError(f"Failed {max_attempts} attempts.")
Setting up the
data cube
:>>> data_session = tt.Session.start() >>> data = pd.DataFrame( ... columns=["City", "Number"], ... data=[ ... ("Paris", 20.0), ... ("London", 5.0), ... ("Madrid", 7.0), ... ], ... ) >>> table = data_session.read_pandas( ... data, keys={"City"}, table_name=application_name ... ) >>> data_cube = data_session.create_cube(table, id_in_cluster=data_cube_id)
Waiting for the data cube to join the cluster:
>>> data_session.clusters[cluster_name] = cluster_definition >>> wait_for_data(expected_city_count=3) >>> query_by_city() Number.SUM City London 5.00 Madrid 7.00 Paris 20.00
Unloading the facts associated with the London and Madrid members:
>>> query_session.query_cubes[ ... query_cube_name ... ].unload_members_from_data_cube( ... {"London", "Madrid"}, ... data_cube_id=data_cube_id, ... level=data_cube.levels["City"], ... ) >>> wait_for_data(expected_city_count=1) >>> query_by_city() Number.SUM City Paris 20.00