Documentation Index
Fetch the complete documentation index at: https://docs.activeviam.com/llms.txt
Use this file to discover all available pages before exploring further.
Tables.data_transaction(scenario_name=None, *, allow_nested=True, tables=None)
Create a data transaction to batch several data loading operations.
- It is more efficient than doing each
load() one after the other, especially when using load_async() to load data concurrently in multiple tables.
- It avoids possibly incorrect intermediate states (e.g. if loading some new data requires dropping existing rows first).
- If an exception is raised during a data transaction, it will be rolled back and the changes made until the exception will be discarded.
Data transactions cannot be mixed with:
- Parameters:
-
allow_nested (bool) –
Whether to allow starting this transaction inside an already running one.
When
False, an error will be raised if this transaction is started while another transaction is already running, regardless of that outer transaction’s value of allow_nested.
The benefit of passing False is that changes made in this transaction are guaranteed, if not rolled back, to be visible to the statements outside the transaction.
The drawback is that it prevents splitting transaction steps in small composable functions.
When nested transactions are allowed, changes made by inner transactions contribute transparently to the outer transaction and will only be committed when the outer transaction’s context exits.
-
scenario_name (str | None) – The name of the source scenario impacted by all the table operations inside the transaction.
-
tables (Set *[*HasIdentifier *[*TableIdentifier ] | TableIdentifier ] | None) – The tables that can be affected by this transaction.
Tables transitively joined with these tables will be locked too.
Transactions locking disjoint sets of tables execute concurrently.
When
None, all tables are locked.
- Return type:
AbstractContextManager[None]
Example
>>> cities_df = pd.DataFrame(
... columns=["City", "Price"],
... data=[
... ("Berlin", 150.0),
... ("London", 240.0),
... ("New York", 270.0),
... ("Paris", 200.0),
... ],
... )
>>> cities_table = session.read_pandas(
... cities_df,
... keys={"City"},
... table_name="Cities",
... )
>>> extra_cities_df = pd.DataFrame(
... columns=["City", "Price"],
... data=[
... ("Singapore", 250.0),
... ],
... )
>>> with session.tables.data_transaction():
... cities_table += ("New York", 100.0)
... cities_table.drop(cities_table["City"] == "Paris")
... cities_table.load(extra_cities_df)
>>> cities_table.head().sort_index()
Price
City
Berlin 150.0
London 240.0
New York 100.0
Singapore 250.0
If an exception is raised during a data transaction, the changes made until the exception will be rolled back.
>>> cities_table.load(cities_df)
>>> cities_table.head().sort_index()
Price
City
Berlin 150.0
London 240.0
New York 270.0
Paris 200.0
>>> with session.tables.data_transaction():
... cities_table += ("New York", 100.0)
... cities_table.drop(cities_table["City"] == "Paris")
... cities_table.load(extra_cities_df)
... raise Exception("Some error")
Traceback (most recent call last):
...
Exception: Some error
>>> cities_table.head().sort_index()
Price
City
Berlin 150.0
London 240.0
New York 270.0
Paris 200.0
Loading data concurrently in multiple tables:
>>> import asyncio
>>> countries_table = session.create_table(
... "Countries",
... data_types={"City": "String", "Country": "String"},
... keys={"City"},
... )
>>> cities_table.join(countries_table)
>>> countries_df = pd.DataFrame(
... columns=["City", "Country"],
... data=[
... ("Berlin", "Germany"),
... ("London", "England"),
... ("New York", "USA"),
... ("Paris", "France"),
... ],
... )
>>> async def load_data_in_all_tables(tables):
... with tables.data_transaction():
... await asyncio.gather(
... tables["Cities"].load_async(cities_df),
... tables["Countries"].load_async(countries_df),
... )
>>> cities_table.drop()
>>> asyncio.run(load_data_in_all_tables(session.tables))
>>> cities_table.head().sort_index()
Price
City
Berlin 150.0
London 240.0
New York 270.0
Paris 200.0
>>> countries_table.head().sort_index()
Country
City
Berlin Germany
London England
New York USA
Paris France
Nested transactions allowed:
>>> def composable_function(session):
... table = session.tables["Cities"]
... with session.tables.data_transaction():
... table += ("Paris", 100.0)
>>> # The function can be called in isolation:
>>> composable_function(session)
>>> cities_table.head().sort_index()
Price
City
Paris 100.0
>>> with session.tables.data_transaction(
... allow_nested=False # No-op because this is the outer transaction.
... ):
... cities_table.drop()
... cities_table += ("Berlin", 200.0)
... # The function can also be called inside another transaction and will contribute to it:
... composable_function(session)
... cities_table += ("New York", 150.0)
>>> cities_table.head().sort_index()
Price
City
Berlin 200.0
New York 150.0
Paris 100.0
Nested transactions not allowed:
>>> def not_composable_function(session):
... table = session.tables["Cities"]
... with session.tables.data_transaction(allow_nested=False):
... table.drop()
... table += ("Paris", 100.0)
... assert table.row_count == 1
>>> # The function can be called in isolation:
>>> not_composable_function(session)
>>> with session.tables.data_transaction():
... cities_table.drop()
... cities_table += ("Berlin", 200.0)
... # This is a programming error, the function cannot be called inside another transaction:
... not_composable_function(session)
... cities_table += ("New York", 150.0)
Traceback (most recent call last):
...
RuntimeError: Cannot start this transaction inside another transaction since nesting is not allowed.
>>> # The last transaction was rolled back:
>>> cities_table.head().sort_index()
Price
City
Paris 100.0
Restricting the transaction to a subset of tables:
>>> customers_table = session.create_table(
... "Customers",
... data_types={"Name": "String", "Stock price": "double"},
... keys={"Name"},
... )
>>> dataframe = pd.DataFrame(
... [("Acme Corporation", 120.0)], columns=list(customers_table)
... )
>>> with session.tables.data_transaction(tables={customers_table}):
... customers_table.load(dataframe)
... # cities_table is not locked and could be updated concurrently in another transaction
>>> customers_table.head()
Stock price
Name
Acme Corporation 120.0
Nested transactions must specify a subset of the outer transaction’s tables:
>>> with session.tables.data_transaction():
... with session.tables.data_transaction(tables={cities_table}):
... pass
... with session.tables.data_transaction(tables={customers_table}):
... pass
>>> with session.tables.data_transaction(
... tables={cities_table, customers_table}
... ):
... with session.tables.data_transaction(tables={cities_table}):
... pass
... with session.tables.data_transaction(tables={customers_table}):
... pass
>>> with session.tables.data_transaction(tables={cities_table}):
... with session.tables.data_transaction(tables={customers_table}):
... pass
Traceback (most recent call last):
...
RuntimeError: Cannot start a transaction locking tables {t['Customers']} inside another transaction locking tables {t['Cities']} which is not a superset.
>>> with session.tables.data_transaction(tables={cities_table}):
... with session.tables.data_transaction():
... pass
Traceback (most recent call last):
...
RuntimeError: Cannot start a transaction locking all tables inside another transaction locking a subset of tables: {t['Cities']}.