SQL / DataFrames
This is a quick introduction into the Spark.jl core functions. It closely follows the official PySpark tutorial and copies many examples verbatim. In most cases, PySpark docs should work for Spark.jl as is or with little adaptation.
Spark.jl applications usually start by creating a SparkSession
:
using Spark
spark = SparkSession.builder.appName("Main").master("local").getOrCreate()
SparkSession()
Note that here we use dot notation to chain function invocations. This makes the code more concise and also mimics Python API, making translation of examples easier. The same example could also be written as:
using Spark
import Spark: appName, master, getOrCreate
builder = SparkSession.builder
builder = appName(builder, "Main")
builder = master(builder, "local")
spark = getOrCreate(builder)
See @chainable
for the details of the dot notation.
DataFrame Creation
In simple cases, a Spark DataFrame can be created via SparkSession.createDataFrame
. E.g. from a list of rows:
using Dates
df = spark.createDataFrame([
Row(a=1, b=2.0, c="string1", d=Date(2000, 1, 1), e=DateTime(2000, 1, 1, 12, 0)),
Row(a=2, b=3.0, c="string2", d=Date(2000, 2, 1), e=DateTime(2000, 1, 2, 12, 0)),
Row(a=4, b=5.0, c="string3", d=Date(2000, 3, 1), e=DateTime(2000, 1, 3, 12, 0))
])
println(df)
22/07/27 06:00:51 WARN SparkSession$Builder: Using an existing SparkSession; some spark core configurations may not take effect.
+---+---+-------+----------+-------------------+
| a| b| c| d| e|
+---+---+-------+----------+-------------------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
| 4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+
Or using an explicit schema:
df = spark.createDataFrame([
[1, 2.0, "string1", Date(2000, 1, 1), DateTime(2000, 1, 1, 12, 0)],
[2, 3.0, "string2", Date(2000, 2, 1), DateTime(2000, 1, 2, 12, 0)],
[3, 4.0, "string3", Date(2000, 3, 1), DateTime(2000, 1, 3, 12, 0)]
], "a long, b double, c string, d date, e timestamp")
println(df)
+---+---+-------+----------+-------------------+
| a| b| c| d| e|
+---+---+-------+----------+-------------------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+
Viewing Data
The top rows of a DataFrame can be displayed using DataFrame.show()
:
df.show(1)
+---+---+-------+----------+-------------------+
| a| b| c| d| e|
+---+---+-------+----------+-------------------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 1 row
You can see the DataFrame’s schema and column names as follows:
df.columns()
5-element Vector{String}:
"a"
"b"
"c"
"d"
"e"
df.printSchema()
root
|-- a: long (nullable = true)
|-- b: double (nullable = true)
|-- c: string (nullable = true)
|-- d: date (nullable = true)
|-- e: timestamp (nullable = true)
Show the summary of the DataFrame
df.select("a", "b", "c").describe().show()
+-------+---+---+-------+
|summary| a| b| c|
+-------+---+---+-------+
| count| 3| 3| 3|
| mean|2.0|3.0| null|
| stddev|1.0|1.0| null|
| min| 1|2.0|string1|
| max| 3|4.0|string3|
+-------+---+---+-------+
DataFrame.collect()
collects the distributed data to the driver side as the local data in Julia. Note that this can throw an out-of-memory error when the dataset is too large to fit in the driver side because it collects all the data from executors to the driver side.
df.collect()
3-element Vector{Row}:
[1,2.0,string1,2000-01-01,2000-01-01 12:00:00.0]
[2,3.0,string2,2000-02-01,2000-01-02 12:00:00.0]
[3,4.0,string3,2000-03-01,2000-01-03 12:00:00.0]
In order to avoid throwing an out-of-memory exception, use take()
or tail()
.
df.take(1)
1-element Vector{Row}:
[1,2.0,string1,2000-01-01,2000-01-01 12:00:00.0]
Selecting and Accessing Data
Spark.jl DataFrame
is lazily evaluated and simply selecting a column does not trigger the computation but it returns a Column
instance.
df.a
col("a")
In fact, most of column-wise operations return Column
s.
typeof(df.c) == typeof(df.c.upper()) == typeof(df.c.isNull())
true
These Column
s can be used to select the columns from a DataFrame
. For example, select()
takes the Column
instances that returns another DataFrame
.
df.select(df.c).show()
+-------+
| c|
+-------+
|string1|
|string2|
|string3|
+-------+
Assign new Column instance.
df.withColumn("upper_c", df.c.upper()).show()
+---+---+-------+----------+-------------------+-------+
| a| b| c| d| e|upper_c|
+---+---+-------+----------+-------------------+-------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+
To select a subset of rows, use filter()
(a.k.a. where()
).
df.filter(df.a == 1).show()
+---+---+-------+----------+-------------------+
| a| b| c| d| e|
+---+---+-------+----------+-------------------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+
Grouping Data
Spark.jl DataFrame
also provides a way of handling grouped data by using the common approach, split-apply-combine strategy. It groups the data by a certain condition applies a function to each group and then combines them back to the DataFrame
.
df = spark.createDataFrame([
["red", "banana", 1, 10], ["blue", "banana", 2, 20], ["red", "carrot", 3, 30],
["blue", "grape", 4, 40], ["red", "carrot", 5, 50], ["black", "carrot", 6, 60],
["red", "banana", 7, 70], ["red", "grape", 8, 80]], ["color string", "fruit string", "v1 long", "v2 long"])
df.show()
22/07/27 06:00:58 WARN SparkSession$Builder: Using an existing SparkSession; some spark core configurations may not take effect.
+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
| red|banana| 1| 10|
| blue|banana| 2| 20|
| red|carrot| 3| 30|
| blue| grape| 4| 40|
| red|carrot| 5| 50|
|black|carrot| 6| 60|
| red|banana| 7| 70|
| red| grape| 8| 80|
+-----+------+---+---+
Grouping and then applying the avg()
function to the resulting groups.
df.groupby("color").avg().show()
+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
| red| 4.8| 48.0|
|black| 6.0| 60.0|
| blue| 3.0| 30.0|
+-----+-------+-------+
Getting Data in/out
Spark.jl can read and write a variety of data formats. Here's a few examples.
CSV
df.write.option("header", true).csv("data/fruits.csv")
spark.read.option("header", true).csv("data/fruits.csv")
Parquet
df.write.parquet("data/fruits.parquet")
spark.read.parquet("data/fruits.parquet")
ORC
df.write.orc("data/fruits.orc")
spark.read.orc("data/fruits.orc")
Working with SQL
DataFrame
and Spark SQL share the same execution engine so they can be interchangeably used seamlessly. For example, you can register the DataFrame
as a table and run a SQL easily as below:
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()
+--------+
|count(1)|
+--------+
| 8|
+--------+
spark.sql("SELECT fruit, sum(v1) as s FROM tableA GROUP BY fruit ORDER BY s").show()
+------+---+
| fruit| s|
+------+---+
|banana| 10|
| grape| 12|
|carrot| 14|
+------+---+