SQL / DataFrames

This is a quick introduction into the Spark.jl core functions. It closely follows the official PySpark tutorial and copies many examples verbatim. In most cases, PySpark docs should work for Spark.jl as is or with little adaptation.

Spark.jl applications usually start by creating a SparkSession:

using Spark

spark = SparkSession.builder.appName("Main").master("local").getOrCreate()
SparkSession()

Note that here we use dot notation to chain function invocations. This makes the code more concise and also mimics Python API, making translation of examples easier. The same example could also be written as:

using Spark
import Spark: appName, master, getOrCreate

builder = SparkSession.builder
builder = appName(builder, "Main")
builder = master(builder, "local")
spark = getOrCreate(builder)

See @chainable for the details of the dot notation.

DataFrame Creation

In simple cases, a Spark DataFrame can be created via SparkSession.createDataFrame. E.g. from a list of rows:

using Dates

df = spark.createDataFrame([
    Row(a=1, b=2.0, c="string1", d=Date(2000, 1, 1), e=DateTime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3.0, c="string2", d=Date(2000, 2, 1), e=DateTime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5.0, c="string3", d=Date(2000, 3, 1), e=DateTime(2000, 1, 3, 12, 0))
])
println(df)
22/07/27 06:00:51 WARN SparkSession$Builder: Using an existing SparkSession; some spark core configurations may not take effect.
+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

Or using an explicit schema:

df = spark.createDataFrame([
    [1, 2.0, "string1", Date(2000, 1, 1), DateTime(2000, 1, 1, 12, 0)],
    [2, 3.0, "string2", Date(2000, 2, 1), DateTime(2000, 1, 2, 12, 0)],
    [3, 4.0, "string3", Date(2000, 3, 1), DateTime(2000, 1, 3, 12, 0)]
], "a long, b double, c string, d date, e timestamp")
println(df)
+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

Viewing Data

The top rows of a DataFrame can be displayed using DataFrame.show():

df.show(1)
+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 1 row

You can see the DataFrame’s schema and column names as follows:

df.columns()
5-element Vector{String}:
 "a"
 "b"
 "c"
 "d"
 "e"
df.printSchema()
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)

Show the summary of the DataFrame

df.select("a", "b", "c").describe().show()
+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   null|
| stddev|1.0|1.0|   null|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+

DataFrame.collect() collects the distributed data to the driver side as the local data in Julia. Note that this can throw an out-of-memory error when the dataset is too large to fit in the driver side because it collects all the data from executors to the driver side.

df.collect()
3-element Vector{Row}:
 [1,2.0,string1,2000-01-01,2000-01-01 12:00:00.0]
 [2,3.0,string2,2000-02-01,2000-01-02 12:00:00.0]
 [3,4.0,string3,2000-03-01,2000-01-03 12:00:00.0]

In order to avoid throwing an out-of-memory exception, use take() or tail().

df.take(1)
1-element Vector{Row}:
 [1,2.0,string1,2000-01-01,2000-01-01 12:00:00.0]

Selecting and Accessing Data

Spark.jl DataFrame is lazily evaluated and simply selecting a column does not trigger the computation but it returns a Column instance.

df.a
col("a")

In fact, most of column-wise operations return Columns.

typeof(df.c) == typeof(df.c.upper()) == typeof(df.c.isNull())
true

These Columns can be used to select the columns from a DataFrame. For example, select() takes the Column instances that returns another DataFrame.

df.select(df.c).show()
+-------+
|      c|
+-------+
|string1|
|string2|
|string3|
+-------+

Assign new Column instance.

df.withColumn("upper_c", df.c.upper()).show()
+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+

To select a subset of rows, use filter() (a.k.a. where()).

df.filter(df.a == 1).show()
+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+

Grouping Data

Spark.jl DataFrame also provides a way of handling grouped data by using the common approach, split-apply-combine strategy. It groups the data by a certain condition applies a function to each group and then combines them back to the DataFrame.

df = spark.createDataFrame([
    ["red", "banana", 1, 10], ["blue", "banana", 2, 20], ["red", "carrot", 3, 30],
    ["blue", "grape", 4, 40], ["red", "carrot", 5, 50], ["black", "carrot", 6, 60],
    ["red", "banana", 7, 70], ["red", "grape", 8, 80]], ["color string", "fruit string", "v1 long", "v2 long"])
df.show()
22/07/27 06:00:58 WARN SparkSession$Builder: Using an existing SparkSession; some spark core configurations may not take effect.
+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+

Grouping and then applying the avg() function to the resulting groups.

df.groupby("color").avg().show()
+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
|black|    6.0|   60.0|
| blue|    3.0|   30.0|
+-----+-------+-------+

Getting Data in/out

Spark.jl can read and write a variety of data formats. Here's a few examples.

CSV

df.write.option("header", true).csv("data/fruits.csv")
spark.read.option("header", true).csv("data/fruits.csv")

Parquet

df.write.parquet("data/fruits.parquet")
spark.read.parquet("data/fruits.parquet")

ORC

df.write.orc("data/fruits.orc")
spark.read.orc("data/fruits.orc")

Working with SQL

DataFrame and Spark SQL share the same execution engine so they can be interchangeably used seamlessly. For example, you can register the DataFrame as a table and run a SQL easily as below:

df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()
+--------+
|count(1)|
+--------+
|       8|
+--------+
spark.sql("SELECT fruit, sum(v1) as s FROM tableA GROUP BY fruit ORDER BY s").show()
+------+---+
| fruit|  s|
+------+---+
|banana| 10|
| grape| 12|
|carrot| 14|
+------+---+