Read with a Cypher query

All the examples in this page assume that the SparkSession has been initialized with the appropriate authentication options. See the Quickstart examples for more details.

If you need more flexibility, you can use the query option to run a custom Cypher® query.

The query must include a RETURN clause. With stored procedures, you need to include a YIELD clause before RETURN.

query option example
val query = """
  MATCH (n:Person)
  WITH n
  LIMIT 2
  RETURN id(n) AS id, n.name AS name
"""

spark.read.format("org.neo4j.spark.DataSource")
  .option("query", query)
  .load()
  .show()
Table 1. Result
id name

0

John Doe

1

Jane Doe

DataFrame columns

The structure of the result DataFrame is defined by the query itself. See the Schema inference page for more details.

This option is best suited when you return individual properties rather than graph entities (nodes, relationships, paths). Returning a graph entity, anyway, does not cause an error.

Recommended Not recommended
MATCH (p:Person)
RETURN id(p) AS id, p.name AS name
MATCH (p:Person)
RETURN p

If you need to return a graph entity, use the labels or relationship read options instead.

Limit the results

The connector uses SKIP and LIMIT internally to support partitioned reads; as a result, SKIP and LIMIT clauses are not allowed in a custom Cypher query. Attempts to do this will cause execution errors.

A possible workaround is to use SKIP and LIMIT before the RETURN clause. For example, the following query fails:

MATCH (p:Person)
RETURN p.name AS name
ORDER BY name
LIMIT 10

The query can be rewritten with LIMIT before the RETURN to complete successfully:

MATCH (p:Person)
WITH p.name AS name
ORDER BY name
LIMIT 10
RETURN p.name

When you rewrite a query, make sure the new query is equivalent to your original query so that the result is the same.

You can also use the query.count option instead of rewriting your query. See the Spark optimizations page for more details.

Script options

The script options allow running one or more Cypher queries ("scripts") before executing the read operation. The result of a script can be used in a subsequent query, for example to inject query parameters. This is not possible if the script contains schema operations.

Use the script option to run a single Cypher query.

To run multiple queries, use the indexed script.N options, where each query is passed in its own option.

Passing multiple queries in a single script option, separated by semicolons (;), is deprecated. To run multiple queries, use the indexed script.N options instead, which keep each query separate and avoid any ambiguity. The script and script.N options cannot be used together.

The script.N options are executed in ascending order of their integer suffix N, regardless of the order in which they are declared. The suffix N must consist of digits only (for example, script.0, script.1, script.10); leading zeros are allowed. Only the relative order of the suffixes matters, so gaps are allowed (for example, script.1, script.5, script.10).

Do not use script options to inject large amounts of data in a Cypher query. See the Performance considerations section below for the recommended alternative.

script and query example
val query = """
  UNWIND range(1, 2) as id
  RETURN id AS val, scriptResult[0].val AS script
"""

spark.read.format("org.neo4j.spark.DataSource")
  .option("script", "RETURN 'foo' AS val")
  .option("query", query)
  .load()
  .show()
Table 2. Result
val script

1

foo

2

foo

Performance considerations

Script options are not meant to inject a massive volume of data. Doing so would result in inefficient queries.

The recommended process is to create a dedicated dataframe containing the new data, then join it with another dataframe performing the actual reads against Neo4j. Spark supports multiple pushdown optimizations including filter pushdown, which means the resulting read query will be efficient.

// Suppose there are many nodes that match,
// resulting in a large list of values
val script = "MATCH (f:Filter) RETURN collect(f.id) AS ids"

// Sending a large list to the server would add a lot of memory pressure
val query = """
  MATCH (n:Comment)
  WHERE n.filterId IN scriptResult[0].ids
  RETURN n.id AS id
"""
spark.read.format("org.neo4j.spark.DataSource")
  .option("script", script)
  .option("query", query)
  .load()
  .show()
import org.apache.spark.sql.functions.col

val filterDf = spark.read.format("org.neo4j.spark.DataSource")
  .option("labels", "Filter")
  .load()

val commentDf = spark.read.format("org.neo4j.spark.DataSource")
  .option("labels", "Comment")
  .load()

// Pushdown optimizations filter data at the source (Neo4j) to minimize data transfer
val resultDf = commentDf.join(filterDf, col("filterId") === filterDf("id"))