Spark on your Oracle Data Warehouse

Table of Contents

1 Background and Motivation

Figure 1 depicts a not uncommon deployment for an Enterprise that uses both open source systems and an Oracle Warehouse. Enterprises are deploying some of their production workloads on open source technologies with the current favorite being Apache Spark. The Oracle Warehouse and open source systems typically share a common storage fabric, typically an object store like Amazon S3. Some of the motives driving such a move are:

  • newer Applications utilize capabilities such as machine learning and low-latency ingest; open source technologies like Apache Spark, Apache Pulsar, TensorFlow provide deep and fast evolving capabilities in these areas.
  • the open source community has done a good job of catering to developers; making it quite easy for small teams to fairly quickly develop and deploy point solutions.
  • price performance metrics of open source systems when dealing with 'Big Data' are perceived to be superior and an open source strategy is preferred nowadays to avoid vendor lock in.

entDep.png

Figure 1: Enterprise Data Lake

The promise of such a 'Enterprise Data Lake' deployment is that it provides a best of both worlds: enabling newer applications to leverage open source stacks without disrupting traditional production workloads such as Enterprise Performance, Business Intelligence Dashboards and Reports, Financial Planning etc.

1.1 Reality of these deployments

Even though open source systems share the storage fabric with the Oracle warehouse, these systems mostly run as silos: each system aspires to be a full-featured data platform for their developer communities. Coexistence with other platforms is an after thought, and mostly handled with a 'connector' story. Connectors at best help reduce data movement between the systems; even this claim has holes and in reality many problems are left unsolved such as coordination and scheduling of tasks across these systems, co-location of runtime resources, efficient data value mapping(not withstanding the promise of Apache Arrow), and deep translation and pushdown of SQL.

conn.png

Figure 2: Non-optimal Data movement and Work coordination between systems

1.2 For the Oracle customer

Within the space of multi-system deployments there is a segment of customer whose data platform is centered around an Oracle Data Warehouse, but who still deploys some Data Applications using Apache Spark. Such customers are often surprised by the significant increase in operational cost of running Spark and Oracle in production:

  • In areas such as Workload management, Query optimization, Data management, Data security Oracle customers are surprised by missing capabilities in Spark and how much manual labor is needed to compensate for this. They are surprised by the investment needed in having to employ a fairly big Data Engineering and Platform team for Spark.
  • To meet operational SLAs Spark clusters end by being over-provisioned; to quote Pepperdata "optimization of cloud systems can 'win back' about 50% of task hours… larger organizations can save as much as $7.9 million a year".

statusquo.png

Figure 3: Reality: Oracle Customers and Spark

2 Our Solution: An integrated platform

We provide Oracle Customers with a more integrated environment where Data management and operational tasks are mostly handled in the Oracle Warehouse. At the same time App. developers are abstracted away from the inner workings of the platform, given them more fluidity in the choices of Language and Functions for their Applications. This fits into Oracle's convergence story of consistent data management, security, operations across all kinds of data processing tasks.

There is no impact on Applications written against the Spark programming model. Customers would deploy Spark in the usual ways: local-mode, cluster-mode, server-less etc; with an additional extension jar provided by oracle and instructions on how to configure the Oracle extensions. Since lesser amounts of processing needs to be done in Spark, and Oracle is leveraged for Administration and Data Management it opens the door to operate smaller and operationally simpler Spark clusters.

spark_on_ora_goals.png

Figure 4: An integrated Oracle and Spark architecture

In order to make this happen, we provide the following new capabilities:

Catalog Integration
The Oracle Data Dictionary will be the source of truth for all metadata. We extend the Spark Catalog to be integrated with the Oracle Dictionary.
Program Translation and Pushdown
by far this is the most important capability that involves the translation of Spark SQL into Oracle SQL(and PL-SQL) for pipelines when the data is residing/managed in Oracle. This applies to more than raw table data stored in Oracle to data managed in Oracle via such things as Materialized Views, Analytic Views and In-Memory Option. Pushing the processing to where the data is, even for complex data pipelines provides the biggest differentiation from 'connector' architectures.
Language Integration
Language Integration encompasses capabilities that extend Apache Spark such that on the one hand native Oracle functions and data structures are usable in Spark programs and on the other hand the translation of a subset of custom scala programs into equivalent oracle sql/pl-sql, so that larger parts of Spark pipelines are pushed down to the Oracle DB for processing.
Runtime Integration
Even with Pushdown and Language Integration there are many pipelines that will contain Spark specific program code; this would trigger large amounts of data to be streamed out of Oracle to Spark executors. A Spark co-processor either co-located with Oracle instances or embedded in them would allow for limited Spark pipelines to be shipped to the Oracle instances for execution.

spark_on_ora_goals2.png

Figure 5: Capabilities of an integrated system

3 Summary of Spark Extensions

3.1 Catalog Integration

We leverage the Catalog Integration and DataSources v2 API in Apache Spark 3.0 to provide a custom plug-in Catalog that integrates with the Oracle Database Data Dictionary. A common catalog implies all persistent data, including data residing in an Object Store is represented in the Oracle Data Dictionary. Hence both systems have a consistent view of the Data Warehouse; a common and consistent enforcement of role and data based security, a common metadata and data consistency model can be applied.

3.2 SQL Translation and Pushdown

These are based on techniques developed over many years to extend the Spark Planner with custom Query Optimizations. See Spark Druid OLAP and Sparklinedata BI Stack on Spark. From the early days it has been possible to extend Spark's Logical and Physical planners. By rewriting Spark Plans we can go way beyond what most Spark 'connectors' provide(that is pushing projections and filters to the underlying system); We are able to entirely push complex analysis pipelines containing all the analytical functions and operators of Spark SQL.

For the TPCDS benchmark we are able to completely pushed down over 90 of the 99 queries. For example Figure 6 shows the non-pushdown vs pushdown plan for TPCDS query q1. We provide more details about TPCDS queries in the TPCDS Queries section.

tpcds_q1.png

Figure 6: TPCDS Query Q1: non-pushdown vs pushdown plan.

When a pushdown query returns a lot of data, the single pipe between a Spark task and the database instance could become a bottleneck of query execution. The Query Splitting feature attempts to split an oracle pushdown query into a set of queries such that the union-all of the results is the same as the original query result. The query can be split by Input Table(s) partitions or blocks or by output Result-Set row ranges.

parallelDataMove.png

Figure 7: Query Splitting

For DML operations we integrate into Spark's Datasources v2 API to provide transactionally consistent DML: during spark job execution, all other jobs will see the state of the table as of the start of the job and we don't have to prevent concurrent dml jobs on a table. On success, the destination table will be in a state that matches some serialization of the jobs. Under the covers data is first written to Temp. tables in Oracle before being merged into the destination table on commit.

basicInsertFlow.png

Figure 8: Insert Flow

3.3 Language Integration

We leverage the ability to register of Custom Functions in Spark to enable the registration of Oracle SQL functions into Spark. These functions can then be used in Spark-SQL on Oracle tables. Our Planner extensions translate these into equivalent Oracle SQL for execution.

We are also investigating surfacing Oracle ML and Geo-spatial capabilities into the Spark programming model.

3.3.1 Spark SQL Macros

Spark provides the ability to register custom functions written in Scala. Under the covers an Invoke Catalyst Expression is associated with the function name in Spark's Function Registry. These functions are usable in Spark-SQL just like built-in functions. At runtime an Invoke Catalyst Expression runs the associated function body.

spark_cust_fn.png

Figure 9: Spark Function Registration and Execution

We have developed the Spark SQL Macros capability that provides the ability to register custom scala functions into a Spark Session just like the custom UDF Registration capability of Spark. The difference being that the SQL Macros registration mechanism attempts to translate the function body to an equivalent Spark catalyst Expression with holes(MarcroArg catalyst expressions). Under the covers SQLMacro is a set of scala blackbox macros that attempt to rewrite the scala function body AST into an equivalent catalyst Expression. There are 2 big benefits of doing this:

  • better performance. Since at runtime we avoid the SerDe cost at the function boundary.
  • Since the physical plan has native catalyst expressions more optimizations are possible. In the case of Oracle pushdown we are able to pushdown the catalyst expressions to Oracle SQL, avoiding streaming data out of Oracle even in the presence of these kinds of custom UDFs.

spark-macro-reg.png

Figure 10: Spark SQL Macro Registration and Execution

3.4 Runtime Integration

We are investigating the concept of a Spark Co-Processor. A co-processor will be a very limited and isolated Spark environment whose only capability is to run a given Spark operator pipeline in a single Task(the Pipeline cannot contain Shuffle operations). Co-processors are unaware of cluster topology or the data dictionary. They are dumb processing units for running custom scala code that apply row/row-group transformations. A Co-Processor could be an isolated process co-located with Oracle Instance processes or it could be embedded inside an Oracle process along the lines of how a javascript engine is embedded inside an Oracle process.

Logically a Spark Plan will get translated into Oracle SQL query blocks connected by calls to a Partitioned Table function. At runtime, this function will be the bridge to the co-processor in the Oracle Operator pipeline. Each Table function invocation will stream its input partition to and from the co-processor; it will coordinate setting up and shutting down the Spark pipeline and executing the pipeline on data chunks. Data Parallelism between Oracle and Spark will be controlled by using Pipelined Parallel Table Function mechanics specifically Order By/Cluster By clauses in its definition.

log_tbl_fun.png

Figure 11: Logical Oracle Plan structure with Table function calls

4 A Peek under the covers

We walk through a sample session of this integrated environment and show some examples. The Oracle Instance is loaded with the TPCDS dataset and we connect to it via Spark Shell. The Spark Configuration for this environment has extra settings such as the connection information and query pushdown instructions.

# Oracle Catalog

# enable Spark Oracle extensions
spark.sql.extensions=org.apache.spark.sql.oracle.SparkSessionExtensions
spark.kryo.registrator=org.apache.spark.sql.connector.catalog.oracle.OraKryoRegistrator

# enable the Oracle Catalog integration
spark.sql.catalog.oracle=org.apache.spark.sql.connector.catalog.oracle.OracleCatalog

# oracle instance connection information
spark.sql.catalog.oracle.url=dbc:oracle:thin:@den02ads:1531/cdb1_pdb7.dev.us.oracle.com
spark.sql.catalog.oracle.user=tpcds
spark.sql.catalog.oracle.password=...

# oracle sql logging and jdbc fetchsize
spark.sql.catalog.oracle.log_and_time_sql.enabled=true
spark.sql.catalog.oracle.log_and_time_sql.log_level=info
spark.sql.catalog.oracle.fetchSize=5000

# Query pushdown
spark.sql.oracle.enable.pushdown=true

# Parallelize data movement.
spark.sql.oracle.enable.querysplitting=true
spark.sql.oracle.querysplit.target=1Mb
spark.sql.oracle.querysplit.maxfetch.rounds=0.5

The Spark shell is then started in the normal way; for example in local mode one could issue: bin/spark-shell --properties-file spark.oracle.properties --master local[*].

4.1 Catalog Integration

The user can browse the Oracle catalog and describe individual tables.

sql("use oracle").show()
sql("show tables").show(10000, false)
sql("describe store_sales").show(10000, false)

catalog.png

Figure 12: Catalog Commands

Oracle has many partitioning schemes such as RANGE, LIST, INTERVAL, and HASH which don't map very well to Spark's value based partitioning scheme; the output Spark SQL show partitions command doesn't give the full partitioning picture. So we have extended Spark SQL; users can issue show oracle partitions to see the table partitioning details.

// show partitions
sql("show partitions store_sales").show(1000, false)
// spark language extension  to see oracle partitions properly
sql("show oracle partitions store_sales").show(1000, false)

4.2 TPCDS Queries

The TPCDS benchmark contains many complex analysis patterns such Star-Schema-Agg-Joins, Multi-Star-Schema-Analysis, Windowing-Functions, Correlated-SubQueries etc. As we have mentioned we are able to completely pushed down over 90 of the 99 queries.

4.2.1 Q1

  • Query is about identifying 'problem' customers.
  • Query involves joins, aggregates, a CTE, a subquery expression, Order By
val q1 = s""" with customer_total_return as
(select sr_customer_sk as ctr_customer_sk
,sr_store_sk as ctr_store_sk
,sum(SR_RETURN_AMT) as ctr_total_return
from store_returns
,date_dim
where sr_returned_date_sk = d_date_sk
and d_year =2000
group by sr_customer_sk
,sr_store_sk)
 select  c_customer_id
from customer_total_return ctr1
,store
,customer
where ctr1.ctr_total_return > (select avg(ctr_total_return)*1.2
from customer_total_return ctr2
where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
and s_store_sk = ctr1.ctr_store_sk
and s_state = 'TN'
and ctr1.ctr_customer_sk = c_customer_sk
order by c_customer_id
 limit 100; """.stripMargin
```

Running this query sql(q1).show(1000000, false) with the pushdown flag on pushes the entire query to Oracle and it runs in sub-second time. Issuing a sql(s"explain oracle pushdown $q1").show(1000, false) shows the Spark Plan and the Oracle Query pushed down. (here again we have extended Spark SQL with the explain oracle pushdown command which is similar to Spark's explain command with the addition that we show the oracle pushdown details). The output shows that for Spark this is a very simple Plan of a custom Scan followed by a Project. Figure 6 show the plan visually.

sql(s"explain oracle pushdown $q1").show(1000, false)

|Project (1)
+- BatchScan (2)

(2) BatchScan
Oracle Instance:
   DataSourceKey(jdbc:oracle:thin:@den02ads:1531/cdb1_pdb7.regress.rdbms.dev.us.oracle.com,tpcds)
Pushdown Oracle SQL:
select "C_CUSTOMER_ID"
from ( select "C_CUSTOMER_ID"
from ( select "SR_CUSTOMER_SK" AS "ctr_customer_sk", "SR_STORE_SK" AS "ctr_store_sk", SUM("SR_RETURN_AMT") AS "ctr_total_return"
from TPCDS.STORE_RETURNS  join TPCDS.DATE_DIM  on ("SR_RETURNED_DATE_SK" = "D_DATE_SK")
where ((("SR_STORE_SK" IS NOT NULL AND "SR_CUSTOMER_SK" IS NOT NULL) AND "SR_RETURNED_DATE_SK" IS NOT NULL) AND ("D_YEAR" IS NOT NULL AND ("D_YEAR" = 2000.000000000000000000)))
group by "SR_CUSTOMER_SK", "SR_STORE_SK" )  join ( select "1_sparkora", "2_sparkora"
from ( select (AVG("ctr_total_return") * 1.2000000000000000000000) AS "1_sparkora", "ctr_store_sk" AS "2_sparkora"
from ( select "SR_STORE_SK" AS "ctr_store_sk", SUM("SR_RETURN_AMT") AS "ctr_total_return"
from TPCDS.STORE_RETURNS  join TPCDS.DATE_DIM  on ("SR_RETURNED_DATE_SK" = "D_DATE_SK")
where (("SR_STORE_SK" IS NOT NULL AND ("SR_RETURNED_DATE_SK" IS NOT NULL AND "SR_RETURNED_DATE_SK" IS NOT NULL)) AND ("D_YEAR" IS NOT NULL AND ("D_YEAR" = 2000.000000000000000000)))
group by "SR_CUSTOMER_SK", "SR_STORE_SK" )
group by "ctr_store_sk" )
where "1_sparkora" IS NOT NULL )  on (("ctr_store_sk" = "2_sparkora") AND (cast("ctr_total_return" as NUMBER(38, 20)) > "1_sparkora")) join TPCDS.STORE  on ("ctr_store_sk" = "S_STORE_SK") join TPCDS.CUSTOMER  on ("ctr_customer_sk" = "C_CUSTOMER_SK")
where ("ctr_total_return" IS NOT NULL AND ("S_STATE" IS NOT NULL AND ("S_STATE" = 'TN')))
order by "C_CUSTOMER_ID" ASC NULLS FIRST )
where rownum <= 100
Pushdown Oracle SQL, Query Splitting details:
Query is not split

Contrast this with the plan when spark.sql.oracle.enable.pushdown=false. This setting turns of the Query rewrites, so the query plan generated and execution is similar to what you would get with a regular 'connector' that provides a custom Spark v2 DataSource. The Spark Plan contains Spark Operations to do the Joins, Aggregates… only the base Table filtering and projection is pushed to Oracle. Figure 6 show the plan visually. Not surprisingly execution with pushdown off takes considerably longer to run: 10s of seconds.

spark.sqlContext.setConf("spark.sql.oracle.enable.pushdown", "false")
sql(s"explain oracle pushdown $q1").show(1000, false)

|TakeOrderedAndProject (1)
+- Project (2)
   +- SortMergeJoin Inner (3)
      :- Project (4)
      :  +- SortMergeJoin Inner (5)
      :     :- Project (6)
      :     :  +- SortMergeJoin Inner (7)
      :     :     :- Filter (8)
      :     :     :  +- HashAggregate (9)
      :     :     :     +- HashAggregate (10)
      :     :     :        +- Project (11)
      :     :     :           +- SortMergeJoin Inner (12)
      :     :     :              :- Project (13)
      :     :     :              :  +- Filter (14)
      :     :     :              :     +- BatchScan (15)
      :     :     :              +- Project (16)
      :     :     :                 +- Filter (17)
      :     :     :                    +- BatchScan (18)
      :     :     +- Filter (19)
      :     :        +- HashAggregate (20)
      :     :           +- HashAggregate (21)
      :     :              +- HashAggregate (22)
      :     :                 +- HashAggregate (23)
      :     :                    +- Project (24)
      :     :                       +- SortMergeJoin Inner (25)
      :     :                          :- Project (26)
      :     :                          :  +- Filter (27)
      :     :                          :     +- BatchScan (28)
      :     :                          +- Project (29)
      :     :                             +- Filter (30)
      :     :                                +- BatchScan (31)
      :     +- Project (32)
      :        +- Filter (33)
      :           +- BatchScan (34)
      +- Project (35)
         +- BatchScan (36)

(15) BatchScan
Oracle Instance:
   DataSourceKey(jdbc:oracle:thin:@den02ads:1531/cdb1_pdb7.regress.rdbms.dev.us.oracle.com,tpcds)
Pushdown Oracle SQL:
select "SR_CUSTOMER_SK", "SR_STORE_SK", "SR_RETURN_AMT", "SR_RETURNED_DATE_SK"
from TPCDS.STORE_RETURNS
where ("SR_STORE_SK" IS NOT NULL AND "SR_CUSTOMER_SK" IS NOT NULL) and "SR_RETURNED_DATE_SK" IS NOT NULL
Pushdown Oracle SQL, Query Splitting details:
Query is not split
(18) BatchScan
Oracle Instance:
   DataSourceKey(jdbc:oracle:thin:@den02ads:1531/cdb1_pdb7.regress.rdbms.dev.us.oracle.com,tpcds)
Pushdown Oracle SQL:
select "D_DATE_SK", "D_YEAR"
from TPCDS.DATE_DIM
where ("D_YEAR" IS NOT NULL AND ("D_YEAR" = 2000.000000000000000000))
Pushdown Oracle SQL, Query Splitting details:
Query is not split
(28) BatchScan
Oracle Instance:
   DataSourceKey(jdbc:oracle:thin:@den02ads:1531/cdb1_pdb7.regress.rdbms.dev.us.oracle.com,tpcds)
Pushdown Oracle SQL:
select "SR_CUSTOMER_SK", "SR_STORE_SK", "SR_RETURN_AMT", "SR_RETURNED_DATE_SK"
from TPCDS.STORE_RETURNS
where "SR_STORE_SK" IS NOT NULL and ("SR_RETURNED_DATE_SK" IS NOT NULL AND "SR_RETURNED_DATE_SK" IS NOT NULL)
Pushdown Oracle SQL, Query Splitting details:
Query is not split
(31) BatchScan
Oracle Instance:
   DataSourceKey(jdbc:oracle:thin:@den02ads:1531/cdb1_pdb7.regress.rdbms.dev.us.oracle.com,tpcds)
Pushdown Oracle SQL:
select "D_DATE_SK", "D_YEAR"
from TPCDS.DATE_DIM
where ("D_YEAR" IS NOT NULL AND ("D_YEAR" = 2000.000000000000000000))
Pushdown Oracle SQL, Query Splitting details:
Query is not split
(34) BatchScan
Oracle Instance:
   DataSourceKey(jdbc:oracle:thin:@den02ads:1531/cdb1_pdb7.regress.rdbms.dev.us.oracle.com,tpcds)
Pushdown Oracle SQL:
select "S_STORE_SK", "S_STATE"
from TPCDS.STORE
where ("S_STATE" IS NOT NULL AND ("S_STATE" = 'TN'))
Pushdown Oracle SQL, Query Splitting details:
Query is not split
(36) BatchScan
Oracle Instance:
   DataSourceKey(jdbc:oracle:thin:@den02ads:1531/cdb1_pdb7.regress.rdbms.dev.us.oracle.com,tpcds)
Pushdown Oracle SQL:
select "C_CUSTOMER_SK", "C_CUSTOMER_ID"
from TPCDS.CUSTOMER
Pushdown Oracle SQL, Query Splitting details:
Query is not split

4.2.2 Pushdown plan for Q5, Q69

Following are pushdown plans for couple more TPCDS queries to show the extent of the pushdown we can do.

  • Query Q5 is a multi star schema report across Sales, Web and Catalog channels. Its SQL contains joins, aggregates, unions, CTEs, rollup.
  • Query 69 is used to identify customers with different buying behavior in 2 separate quarters. Its SQL contains joins, aggregates, subquery predicates.

TPCDS Q5 oracle pushdown:

sql(s"explain oracle pushdown $q5").show(1000, false)

|Project (1)
+- BatchScan (2)

(2) BatchScan
Oracle Instance:
   DataSourceKey(jdbc:oracle:thin:@den02ads:1531/cdb1_pdb7.regress.rdbms.dev.us.oracle.com,tpcds)
Pushdown Oracle SQL:
select "channel_10_sparkora", "id_6_sparkora", "sales", "returns", "profit"
from ( select "channel_10_sparkora", "id_6_sparkora", SUM("sales") AS "sales", SUM("returns") AS "returns", SUM("profit") AS "profit"
from ( select SUM("sales_price") AS "sales", SUM("return_amt") AS "returns", (cast(SUM("profit") as NUMBER(38, 17)) - cast(SUM("net_loss") as NUMBER(38, 17))) AS "profit", 'store channel' AS "channel", CONCAT('store' , "S_STORE_ID") AS "id"
from ( select "SS_STORE_SK" AS "store_sk", "SS_SOLD_DATE_SK" AS "date_sk", "SS_EXT_SALES_PRICE" AS "sales_price", "SS_NET_PROFIT" AS "profit", 0E-18 AS "return_amt", 0E-18 AS "net_loss"
from TPCDS.STORE_SALES
where ("SS_STORE_SK" IS NOT NULL AND "SS_SOLD_DATE_SK" IS NOT NULL) UNION ALL select "SR_STORE_SK" AS "store_sk", "SR_RETURNED_DATE_SK" AS "date_sk", 0E-18 AS "sales_price", 0E-18 AS "profit", "SR_RETURN_AMT" AS "return_amt", "SR_NET_LOSS" AS "net_loss"
from TPCDS.STORE_RETURNS
where ("SR_STORE_SK" IS NOT NULL AND "SR_RETURNED_DATE_SK" IS NOT NULL) )  join TPCDS.DATE_DIM  on ("date_sk" = "D_DATE_SK") join TPCDS.STORE  on ("store_sk" = "S_STORE_SK")
where (("D_DATE" IS NOT NULL AND ("D_DATE" >= TRUNC(TIMESTAMP '2000-08-23 07:00:00.000000'))) AND ("D_DATE" <= TRUNC(TIMESTAMP '2000-09-06 07:00:00.000000')))
group by "S_STORE_ID" UNION ALL select SUM("sales_price") AS "sales", SUM("return_amt") AS "returns", (cast(SUM("profit") as NUMBER(38, 17)) - cast(SUM("net_loss") as NUMBER(38, 17))) AS "profit", 'catalog channel' AS "channel", CONCAT('catalog_page' , "CP_CATALOG_PAGE_ID") AS "id"
from ( select "CS_CATALOG_PAGE_SK" AS "page_sk", "CS_SOLD_DATE_SK" AS "date_sk", "CS_EXT_SALES_PRICE" AS "sales_price", "CS_NET_PROFIT" AS "profit", 0E-18 AS "return_amt", 0E-18 AS "net_loss"
from TPCDS.CATALOG_SALES
where ("CS_CATALOG_PAGE_SK" IS NOT NULL AND "CS_SOLD_DATE_SK" IS NOT NULL) UNION ALL select "CR_CATALOG_PAGE_SK" AS "page_sk", "CR_RETURNED_DATE_SK" AS "date_sk", 0E-18 AS "sales_price", 0E-18 AS "profit", "CR_RETURN_AMOUNT" AS "return_amt", "CR_NET_LOSS" AS "net_loss"
from TPCDS.CATALOG_RETURNS
where ("CR_CATALOG_PAGE_SK" IS NOT NULL AND "CR_RETURNED_DATE_SK" IS NOT NULL) )  join TPCDS.DATE_DIM  on ("date_sk" = "D_DATE_SK") join TPCDS.CATALOG_PAGE  on ("page_sk" = "CP_CATALOG_PAGE_SK")
where (("D_DATE" IS NOT NULL AND ("D_DATE" >= TRUNC(TIMESTAMP '2000-08-23 07:00:00.000000'))) AND ("D_DATE" <= TRUNC(TIMESTAMP '2000-09-06 07:00:00.000000')))
group by "CP_CATALOG_PAGE_ID" UNION ALL select SUM("sales_price") AS "sales", SUM("return_amt") AS "returns", (cast(SUM("profit") as NUMBER(38, 17)) - cast(SUM("net_loss") as NUMBER(38, 17))) AS "profit", 'web channel' AS "channel", CONCAT('web_site' , "WEB_SITE_ID") AS "id"
from ( select "WS_WEB_SITE_SK" AS "wsr_web_site_sk", "WS_SOLD_DATE_SK" AS "date_sk", "WS_EXT_SALES_PRICE" AS "sales_price", "WS_NET_PROFIT" AS "profit", 0E-18 AS "return_amt", 0E-18 AS "net_loss"
from TPCDS.WEB_SALES
where ("WS_WEB_SITE_SK" IS NOT NULL AND "WS_SOLD_DATE_SK" IS NOT NULL) UNION ALL select "WS_WEB_SITE_SK" AS "wsr_web_site_sk", "WR_RETURNED_DATE_SK" AS "date_sk", 0E-18 AS "sales_price", 0E-18 AS "profit", "WR_RETURN_AMT" AS "return_amt", "WR_NET_LOSS" AS "net_loss"
from TPCDS.WEB_RETURNS  join TPCDS.WEB_SALES  on (("WR_ITEM_SK" = "WS_ITEM_SK") AND ("WR_ORDER_NUMBER" = "WS_ORDER_NUMBER"))
where ("WR_RETURNED_DATE_SK" IS NOT NULL AND "WS_WEB_SITE_SK" IS NOT NULL) )  join TPCDS.DATE_DIM  on ("date_sk" = "D_DATE_SK") join TPCDS.WEB_SITE  on ("wsr_web_site_sk" = "WEB_SITE_SK")
where (("D_DATE" IS NOT NULL AND ("D_DATE" >= TRUNC(TIMESTAMP '2000-08-23 07:00:00.000000'))) AND ("D_DATE" <= TRUNC(TIMESTAMP '2000-09-06 07:00:00.000000')))
group by "WEB_SITE_ID" )   , lateral ( select "channel" "channel_10_sparkora", "id" "id_6_sparkora", 0 "spark_grouping_id_8_sparkora" from dual union all select "channel", null, 1 from dual union all select null, null, 3 from dual )
group by "channel_10_sparkora", "id_6_sparkora", "spark_grouping_id_8_sparkora"
order by "channel_10_sparkora" ASC NULLS FIRST, "id_6_sparkora" ASC NULLS FIRST )
where rownum <= 100
Pushdown Oracle SQL, Query Splitting details:
Query is not split

TPCDS Q69 oracle pushdown:

sql(s"explain oracle pushdown $q69").show(1000, false)

|Project (1)
+- BatchScan (2)

(2) BatchScan
Oracle Instance:
   DataSourceKey(jdbc:oracle:thin:@den02ads:1531/cdb1_pdb7.regress.rdbms.dev.us.oracle.com,tpcds)
Pushdown Oracle SQL:
select "CD_GENDER", "CD_MARITAL_STATUS", "CD_EDUCATION_STATUS", "cnt1", "CD_PURCHASE_ESTIMATE", "cnt2", "CD_CREDIT_RATING", "cnt3"
from ( select "CD_GENDER", "CD_MARITAL_STATUS", "CD_EDUCATION_STATUS", COUNT(1) AS "cnt1", "CD_PURCHASE_ESTIMATE", COUNT(1) AS "cnt2", "CD_CREDIT_RATING", COUNT(1) AS "cnt3"
from TPCDS.CUSTOMER "sparkora_0" join TPCDS.CUSTOMER_ADDRESS  on ("C_CURRENT_ADDR_SK" = "CA_ADDRESS_SK") join TPCDS.CUSTOMER_DEMOGRAPHICS  on ("C_CURRENT_CDEMO_SK" = "CD_DEMO_SK")
where ((((("C_CURRENT_ADDR_SK" IS NOT NULL AND "C_CURRENT_CDEMO_SK" IS NOT NULL) AND  "sparkora_0"."C_CUSTOMER_SK" IN ( select "SS_CUSTOMER_SK"
from TPCDS.STORE_SALES  join TPCDS.DATE_DIM  on ("SS_SOLD_DATE_SK" = "D_DATE_SK")
where ("SS_SOLD_DATE_SK" IS NOT NULL AND (((("D_YEAR" IS NOT NULL AND "D_MOY" IS NOT NULL) AND ("D_YEAR" = 2001.000000000000000000)) AND ("D_MOY" >= 4.000000000000000000)) AND ("D_MOY" <= 6.000000000000000000))) )) AND not exists ( select 1
from TPCDS.WEB_SALES  join TPCDS.DATE_DIM  on ("WS_SOLD_DATE_SK" = "D_DATE_SK")
where (("WS_SOLD_DATE_SK" IS NOT NULL AND (((("D_YEAR" IS NOT NULL AND "D_MOY" IS NOT NULL) AND ("D_YEAR" = 2001.000000000000000000)) AND ("D_MOY" >= 4.000000000000000000)) AND ("D_MOY" <= 6.000000000000000000))) AND ("sparkora_0"."C_CUSTOMER_SK" = "WS_BILL_CUSTOMER_SK")) )) AND not exists ( select 1
from TPCDS.CATALOG_SALES  join TPCDS.DATE_DIM  on ("CS_SOLD_DATE_SK" = "D_DATE_SK")
where (("CS_SOLD_DATE_SK" IS NOT NULL AND (((("D_YEAR" IS NOT NULL AND "D_MOY" IS NOT NULL) AND ("D_YEAR" = 2001.000000000000000000)) AND ("D_MOY" >= 4.000000000000000000)) AND ("D_MOY" <= 6.000000000000000000))) AND ("sparkora_0"."C_CUSTOMER_SK" = "CS_SHIP_CUSTOMER_SK")) )) AND "CA_STATE" IN ( 'KY', 'GA', 'NM' ))
group by "CD_GENDER", "CD_MARITAL_STATUS", "CD_EDUCATION_STATUS", "CD_PURCHASE_ESTIMATE", "CD_CREDIT_RATING"
order by "CD_GENDER" ASC NULLS FIRST, "CD_MARITAL_STATUS" ASC NULLS FIRST, "CD_EDUCATION_STATUS" ASC NULLS FIRST, "CD_PURCHASE_ESTIMATE" ASC NULLS FIRST, "CD_CREDIT_RATING" ASC NULLS FIRST )
where rownum <= 100
Pushdown Oracle SQL, Query Splitting details:
Query is not split

4.3 Registration and Use of Oracle Functions(Row and Aggregate) in Spark SQL

We extend the SparkSession with a registerOracleFunction function. The User can specify an existing Oracle Function in the form (packageName, functionName). In the following we show the registration and use of Oracle's STANDARD.SYS_CONTEXT row and STRAGG aggregate functions.

spark.sqlContext.setConf("spark.sql.oracle.enable.pushdown", "true")

import org.apache.spark.sql.oracle._

// REGISTER ROW function
spark.registerOracleFunction(Some("STANDARD"), "SYS_CONTEXT")
sql("""
select 
  oracle.sys_context('USERENV', 'CLIENT_PROGRAM_NAME') ora_client_pgm
from sparktest.unit_test
limit 1
""".stripMargin).show(10000, false)

// REGISTER AGG function
spark.registerOracleFunction(None, "STRAGG")
sql(
"""
select c_char_5, 
       oracle.stragg(c_char_1)
from sparktest.unit_test
group by c_char_5
""".stripMargin).show(10000, false)

4.4 Spark SQL Macros

4.4.1 Defining Macros

The process of defining Spark SQL Macros is almost identical to defining custom functions in Spark. So consider a simple function (i : Int) => i + 2. Following is how you would define it as a custom Spark function and as a Spark SQL Macro.

spark.udf.register("intUDF", (i: Int) => {
       i + 2
      })

import org.apache.spark.sql.defineMacros._
spark.registerMacro("intUDM", spark.udm((i: Int) => {
   i + 2
  }))

4.4.2 Tax and Discount Calculation

Now consider a tax and discount calculation defined as:

  • no tax on groceries, alcohol is `10.5%`, everything else is `9.5%`
  • on Sundays give a discount of `5%` on alcohol.

This would be defined using a custom function and Spark SQL macro with the following code.

/* CUSTOM FUNCTION DEFINITION */
spark.udf.register("taxAndDiscountF", {(prodCat : String, amt : Double) =>
    val taxRate = prodCat match {
      case "grocery" => 0.0
      case "alcohol" => 10.5
      case _ => 9.5
    }
    val currDate = currentDate(ZoneId.systemDefault())
    val discount = if (getDayOfWeek(currDate) == 1 && prodCat == "alcohol") 0.05 else 0.0

    amt * ( 1.0 - discount) * (1.0 + taxRate)
})

/* SQL MACRO DEFINITION */
import org.apache.spark.sql.defineMacros._
import org.apache.spark.sql.sqlmacros.DateTimeUtils._
import java.time.ZoneId
spark.registerMacro("taxAndDiscountM", spark.udm({(prodCat : String, amt : Double) =>
    val taxRate = prodCat match {
      case "grocery" => 0.0
      case "alcohol" => 10.5
      case _ => 9.5
    }
    val currDate = currentDate(ZoneId.systemDefault())
    val discount = if (getDayOfWeek(currDate) == 1 && prodCat == "alcohol") 0.05 else 0.0

    amt * ( 1.0 - discount) * (1.0 + taxRate)
}))

4.4.3 Query Plan and execution

Now consider a query on items that invokes the Spark SQL Macro. We show both the extended and formatted plans so you can see the logical plan Spark generates and what query gets pushed to Oracle.

spark.sql(
  """
    |explain extended
    |select i_item_id,
    |       taxAndDiscountM(I_CATEGORY, I_CURRENT_PRICE) as taxAndDiscount
    |from item""".stripMargin
).show(1000, false)


spark.sql(
  """
    |explain formatted
    |select i_item_id,
    |       taxAndDiscountM(I_CATEGORY, I_CURRENT_PRICE) as taxAndDiscount
    |from item""".stripMargin
).show(1000, false)

The plans are:

OUTPUT OF EXPLAIN EXTENDED:

|== Parsed Logical Plan ==
'Project ['i_item_id, 'taxAndDiscount('I_CATEGORY, 'I_CURRENT_PRICE) AS taxAndDiscount#1075]
+- 'UnresolvedRelation [item], [], false

== Analyzed Logical Plan ==
i_item_id: string, taxAndDiscount: double
Project [i_item_id#1082, ((cast(I_CURRENT_PRICE#1086 as double) * (1.0 - if (((dayofweek(current_date(Some(America/Los_Angeles))) = 1) AND (I_CATEGORY#1093 = alcohol))) 0.05 else 0.0)) * (1.0 + CASE WHEN (I_CATEGORY#1093 = grocery) THEN 0.0 WHEN (I_CATEGORY#1093 = alcohol) THEN 10.5 ELSE 9.5 END)) AS taxAndDiscount#1075]
+- SubqueryAlias oracle.tpcds.item
   +- RelationV2[I_ITEM_SK#1081, I_ITEM_ID#1082, I_REC_START_DATE#1083, I_REC_END_DATE#1084, I_ITEM_DESC#1085, I_CURRENT_PRICE#1086, I_WHOLESALE_COST#1087, I_BRAND_ID#1088, I_BRAND#1089, I_CLASS_ID#1090, I_CLASS#1091, I_CATEGORY_ID#1092, I_CATEGORY#1093, I_MANUFACT_ID#1094, I_MANUFACT#1095, I_SIZE#1096, I_FORMULATION#1097, I_COLOR#1098, I_UNITS#1099, I_CONTAINER#1100, I_MANAGER_ID#1101, I_PRODUCT_NAME#1102] TPCDS.ITEM

== Optimized Logical Plan ==
RelationV2[I_ITEM_ID#1082, taxAndDiscount#1075] TPCDS.ITEM

== Physical Plan ==
*(1) Project [I_ITEM_ID#1082, taxAndDiscount#1075]
+- BatchScan[I_ITEM_ID#1082, taxAndDiscount#1075] class org.apache.spark.sql.connector.read.oracle.OraPushdownScan
|

OUTPUT OF EXPLAIN FORMATTED:

|== Physical Plan ==
 Project (2)
+- BatchScan (1)


(1) BatchScan
Output [2]: [I_ITEM_ID#1121, taxAndDiscount#1114]
OraPlan: 00 OraSingleQueryBlock [I_ITEM_ID#1121, ((cast(I_CURRENT_PRICE#1125 as double) * 1.0) * (1.0 + CASE WHEN (I_CATEGORY#1132 = grocery) THEN 0.0 WHEN (I_CATEGORY#1132 = alcohol) THEN 10.5 ELSE 9.5 END)) AS taxAndDiscount#1114], [oracolumnref(I_ITEM_ID#1121), oraalias(((cast(I_CURRENT_PRICE#1125 as double) * 1.0) * (1.0 + CASE WHEN (I_CATEGORY#1132 = grocery) THEN 0.0 WHEN (I_CATEGORY#1132 = alcohol) THEN 10.5 ELSE 9.5 END)) AS taxAndDiscount#1114)]
01 +- OraTableScan TPCDS.ITEM, [I_ITEM_ID#1121, I_CURRENT_PRICE#1125, I_CATEGORY#1132]
ReadSchema: struct<I_ITEM_ID:string,taxAndDiscount:double>
dsKey: DataSourceKey(jdbc:oracle:thin:@den02ads:1531/cdb1_pdb7.regress.rdbms.dev.us.oracle.com,tpcds)
oraPushdownSQL: select "I_ITEM_ID", ((cast("I_CURRENT_PRICE" as NUMBER(30, 15)) * 1.0d) * (1.0d + CASE WHEN ("I_CATEGORY" = 'grocery') THEN 0.0d WHEN ("I_CATEGORY" = 'alcohol') THEN 10.5d ELSE 9.5d END)) AS "taxAndDiscount"
from TPCDS.ITEM

(2) Project [codegen id : 1]
Output [2]: [I_ITEM_ID#1121, taxAndDiscount#1114]
Input [2]: [I_ITEM_ID#1121, taxAndDiscount#1114]

The analyzed catalyst expression for the`taxDiscount` invocation is below. Since it contains no custom function invocation, it is completely pushed to Oracle

(
  (cast(I_CURRENT_PRICE#1086 as double) *
  (1.0 - if (((dayofweek(current_date(Some(America/Los_Angeles))) = 1) AND (I_CATEGORY#1093 = alcohol))) 0.05 else 0.0)) *
  (1.0 + CASE WHEN (I_CATEGORY#1093 = grocery) THEN 0.0 WHEN (I_CATEGORY#1093 = alcohol) THEN 10.5 ELSE 9.5 END)
) AS taxAndDiscount#1075

The generated Oracle SQL is even simpler because it got generated after constant folding was applied

(
  (cast("I_CURRENT_PRICE" as NUMBER(30, 15)) * 1.0d) *
  (1.0d + CASE WHEN ("I_CATEGORY" = 'grocery') THEN 0.0d WHEN ("I_CATEGORY" = 'alcohol') THEN 10.5d ELSE 9.5d END)
) AS "taxAndDiscount"

Contrast this with the query plan and execution if we had used the custom function instead (you can easily try this on your own):

  • the custom function expression wouldn't be pushable to Oracle, so raw column values would be pulled out of Oracle.
  • Function evaluation in Spark is row by row and would incur Serialization/Deserialization cost between catalyst and JVM values.
  • The Spark SQL Macro plan would be even more advantageous if the function was in the where clause

Author: Harish Butani

Created: 2021-03-23 Tue 15:51

Validate