Skip to main content

Databricks

Time-travel support

Time-travel is supported by DirectQuery with Databricks, but is limited to tables (and not views, as it is not supported by Databricks). This means that updating applications based on this Database are subject to data desynchronization in case of changes in a view of the database.

To choose the time-travel behavior, DatabricksDirectQuerySettings#timeTravelPolicy can be configured with the following values:

  • DatabricksDirectQuerySettings.TimeTravelPolicy#DISABLED: time-travel will not be used.
  • DatabricksDirectQuerySettings.TimeTravelPolicy#STRICT: time-travel will be used if there are only tables and no views, and otherwise will throw an error. This is the default mode.
  • DatabricksDirectQuerySettings.TimeTravelPolicy#LIGHT: time-travel will be used only for tables and views will be queried without it.

Read this page to learn more about the visible effects of this desynchronization.

Vectors support

Databricks handles vectors natively, via the Array type in the model and thanks to special functions at query time. However, Databricks does not natively support the aggregation function used by Atoti Server such as a vector sum.

Emulated vectors (multi-row and multi-column) are also supported on Databricks.

Databricks SQL Warehouse

Native vectors with DirectQuery are not supported on SQL warehouse, as UDAFs are not supported on Databricks SQL Warehouse. Only emulated vector are supported.

Databricks Cluster

In order to support native vector aggregation, DirectQuery provides UDAFs (User Defined Aggregation Functions) as precompiled jars. The user must register those on the Databricks cluster.

Databricks cluster also support emulated vectors, but it is recommended to use the native ones for performance.

The jar can be downloaded from ActiveViam's JFrog Artifactory. The artifact is under the package com.activeviam.database.databricks-udafs, and should be named such as databricks-udafs-ATOTI_VERSION.jar (do NOT use the jars containing the sources or javadoc suffixes).

Upload the JAR onto Databricks cluster

First, ensure the DBFS file browser option is enabled. This can be done by clicking on your user icon in the top right corner, then selecting "Settings". In the "Advanced" tab, scroll down to the "Other" section, and enable the "DBFS File Browser" option.

Then, you can return to a notebook to upload the jar onto the Databricks cluster. At the top of a notebook, select the option "Open DBFS File Browser" in the "File" menu, and then "Upload", say into the "jars/" folder.

These actions might require admin permissions on the workspace.

Register the UDAFs

Then register the functions with the following sql, replacing ATOTI_VERSION by your version:

DROP FUNCTION IF EXISTS spark_catalog.default.grossVector;
DROP FUNCTION IF EXISTS spark_catalog.default.longVector;
DROP FUNCTION IF EXISTS spark_catalog.default.shortVector;
DROP FUNCTION IF EXISTS spark_catalog.default.sumVector;
DROP FUNCTION IF EXISTS spark_catalog.default.sumProductVector;
CREATE FUNCTION spark_catalog.default.grossVector AS 'com.activeviam.database.databricksudafs.internal.GenericUdafGrossVector' USING JAR 'dbfs:/FileStore/jars/databricks_udafs_ATOTI_VERSION.jar';
CREATE FUNCTION spark_catalog.default.longVector AS 'com.activeviam.database.databricksudafs.internal.GenericUdafLongVector' USING JAR 'dbfs:/FileStore/jars/databricks_udafs_ATOTI_VERSION.jar';
CREATE FUNCTION spark_catalog.default.shortVector AS 'com.activeviam.database.databricksudafs.internal.GenericUdafShortVector' USING JAR 'dbfs:/FileStore/jars/databricks_udafs_ATOTI_VERSION.jar';
CREATE FUNCTION spark_catalog.default.sumVector AS 'com.activeviam.database.databricksudafs.internal.GenericUdafSumVector' USING JAR 'dbfs:/FileStore/jars/databricks_udafs_ATOTI_VERSION.jar';
CREATE FUNCTION spark_catalog.default.sumProductVector AS 'com.activeviam.database.databricksudafs.internal.SumProductUdafVector' USING JAR 'dbfs:/FileStore/jars/databricks_udafs_ATOTI_VERSION.jar';

If you encounter the error Create external Hive function are not supported in Unity Catalog, please ensure you are registering the UDAFs on a Hive metastore (e.g. spark_catalog).

You can then use these new UDAFs as aggregation functions.

Run the following query to test whether the UDAFs were correctly registered:

SELECT group, spark_catalog.default.sumVector(arr) FROM (SELECT * from (values ('a', ARRAY(1, 2, 3)), ('a', ARRAY(10, 10, 10))) AS t(group, arr)) GROUP BY group;

The query should run without error, and return a single row: ('a', [11, 12, 13]).

Customize the UDAF names

In case, the default UDAF names as given above are not convenient (e.g. the catalog or schema is different, or the function names are already taken), each UDAF name is configurable via the DatabricksDialectSettings class:

You must pass the UDAF names to:

DatabricksDirectQuerySettings settings =
DatabricksDirectQuerySettings.builder()
.sumNativeArrayUdafName("MY_CATALOG.MY_SCHEMA.SUM_NATIVE_ARRAY_UDAF")
.longSumNativeArrayUdafName("MY_CATALOG.MY_SCHEMA.LONG_SUM_NATIVE_ARRAY_UDAF")
.shortSumNativeArrayUdafName("MY_CATALOG.MY_SCHEMA.SHORT_SUM_NATIVE_ARRAY_UDAF")
.grossSumNativeArrayUdafName("MY_CATALOG.MY_SCHEMA.GROSS_SUM_NATIVE_ARRAY_UDAF")
.sumScalarProductNativeArrayUdafName(
"MY_CATALOG.MY_SCHEMA.SUM_PRODUCT_NATIVE_ARRAY_UDAF")
.build();

You can then pass it as argument to your session:

final Session session = Session.createSession(properties, settings);

Gotchas

Nullable fields

Databricks can define fields with nullable types. DirectQuery is capable of detecting this and defines its internal model accordingly.
While this is not a problem in itself, it can conflict with the rule in Atoti cubes that all levels must be based on non-nullable values. This does not create an issue of any sort as the local model is updated behind the scene, assigning a default value based on the type.
It has a side effect on the query performance, as DirectQuery must convert on-the-fly null values to their assigned default values, as well as adding extra conditions during joins to handle null values on both sides.

Heavy-load connection string

On modern cloud distributed databases such as Databricks, a key design point is the separation of compute and storage. This allows to use different computing resources to process queries and brings flexibility and the capability to scale up and down the resources (and the associated bill !). On Databricks, the compute resources can be clusters or SQL warehouses. A bigger compute will process queries faster but will also cost more.

The load on the external database of a DirectQuery application is quite particular:

  • At the application startup and refresh, many queries are performed on the database to initialize the cube and cache data in it (the hierarchies and aggregate providers).
  • After the initial feeding, if the aggregate providers have been chosen wisely, most user queries will hit the cache data and actually never been run on the external database. Additionally, queries hitting the external database should most of the time be drill-down with a very reduced scope and should be easier to handle by Snowflake.

In order to have faster startup time, it is recommended to use bigger computational resources during that time. Afterward, to save on cost, the computational resources can be scaled down. DirectQuery provides a way to define two compute resources to use for your application:

  • one heavy load cluster or SQL warehouse, expected to be the more powerful one, to guarantee faster startup time
  • a regular cluster or SQL warehouse, to serve the queries after the initial load in a cost-efficient manner

That, after the initial feeding, as the large compute resource will not be used, it will automatically shut down if you configured it as such.

This heavy load connection string can be defined as such:

final DatabricksConnectionProperties properties =
DatabricksConnectionProperties.builder()
.connectionString("CONNECTION_STRING_TO_SMALL_AND_CHEAP_CLUSTER")
.heavyLoadConnectionString("CONNECTION_STRING_TO_LARGE_AND_EXPENSIVE_CLUSTER")
.additionalOption(
DatabricksConnectionProperties.PASSWORD_PROPERTY_KEY, "your-plain-password")
.build();

You can then pass it as argument to your session:

final Session session = Session.createSession(properties, settings);

If it is not defined, the regular compute resource will be used for the initial feeding.

Note that both compute resources need to be of the same type (either cluster or SQL warehouse), to run the same runtime version, and to share the same password.