Skip to main content

Spark JDBC

One of the most used data sources supported by Spark is JDBC. In this section, we will provide details on how to use the ClickHouse official JDBC connector with Spark.

Read data

public static void main(String[] args) {
// Initialize Spark session
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();

String jdbcURL = "jdbc:ch://localhost:8123/default";
String query = "select * from example_table where id > 2";


//---------------------------------------------------------------------------------------------------
// Load the table from ClickHouse using jdbc method
//---------------------------------------------------------------------------------------------------
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "123456");

Dataset<Row> df1 = spark.read().jdbc(jdbcURL, String.format("(%s)", query), jdbcProperties);

df1.show();

//---------------------------------------------------------------------------------------------------
// Load the table from ClickHouse using load method
//---------------------------------------------------------------------------------------------------
Dataset<Row> df2 = spark.read()
.format("jdbc")
.option("url", jdbcURL)
.option("user", "default")
.option("password", "123456")
.option("query", query)
.load();


df2.show();


// Stop the Spark session
spark.stop();
}

Write data

 public static void main(String[] args) {
// Initialize Spark session
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();

// JDBC connection details
String jdbcUrl = "jdbc:ch://localhost:8123/default";
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "123456");

// Create a sample DataFrame
StructType schema = new StructType(new StructField[]{
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false)
});

List<Row> rows = new ArrayList<Row>();
rows.add(RowFactory.create(1, "John"));
rows.add(RowFactory.create(2, "Doe"));


Dataset<Row> df = spark.createDataFrame(rows, schema);

//---------------------------------------------------------------------------------------------------
// Write the df to ClickHouse using the jdbc method
//---------------------------------------------------------------------------------------------------

df.write()
.mode(SaveMode.Append)
.jdbc(jdbcUrl, "example_table", jdbcProperties);

//---------------------------------------------------------------------------------------------------
// Write the df to ClickHouse using the save method
//---------------------------------------------------------------------------------------------------

df.write()
.format("jdbc")
.mode("append")
.option("url", jdbcUrl)
.option("dbtable", "example_table")
.option("user", "default")
.option("password", "123456")
.option("SaveMode", "append")
.save();


// Stop the Spark session
spark.stop();
}

Parallelism

When using Spark JDBC, Spark reads the data using a single partition. To achieve higher concurrency, you must specify partitionColumn, lowerBound, upperBound, and numPartitions, which describe how to partition the table when reading in parallel from multiple workers. Please visit Apache Spark's official documentation for more information on JDBC configurations.

JDBC Limitations

  • As of today, you can insert data using JDBC only into existing tables (currently there is no way to auto create the table on DF insertion, as Spark does with other connectors).