Example CDC usage in PythonAuraDB Virtual Dedicated CloudEnterprise Edition
See Monitoring and addressing CDC client performance for throughput guidance when extending this example.
import getopt
import json
import sys
import time
from threading import Thread
from neo4j import GraphDatabase
class CDCService:
def __init__(self, driver, database, start_cursor=None, selectors=None):
self.driver = driver
self.database = database
self.cursor = start_cursor
if self.cursor is None:
self.cursor = self.current_change_id()
self.selectors = selectors
def apply_change(self, record): (1)
record_dict = {
k: record.get(k)
for k in ('id', 'txId', 'seq', 'event', 'metadata')
}
print(json.dumps(record_dict, indent=2, default=repr))
def query_changes_query(self, tx):
current = self.current_change_id()
result = tx.run('CALL db.cdc.query($cursor, $selectors)', (2)
cursor=self.cursor, selectors=self.selectors)
if result.peek() == None:
self.cursor = current (3)
else:
for record in result:
try:
self.apply_change(record) (4)
except Exception as e:
print('Error whilst applying change', e)
break
self.cursor = record['id'] (5)
def query_changes(self):
with self.driver.session(database=self.database) as session:
session.execute_read(self.query_changes_query)
def earliest_change_id(self): (6)
records, _, _ = self.driver.execute_query(
'CALL db.cdc.earliest', database_=self.database)
return records[0]['id']
def current_change_id(self): (7)
records, _, _ = self.driver.execute_query(
'CALL db.cdc.current', database_=self.database)
return records[0]['id']
def run(self):
while True: (9)
self.query_changes()
time.sleep(0.5)
def main(argv):
# Default values
address = 'neo4j://localhost:7687'
database = 'neo4j'
username = 'neo4j'
password = 'passw0rd'
cursor = None
opts, _ = getopt.getopt(
argv, 'a:d:u:p:f:',
['address=', 'database=', 'username=', 'password=', 'from='])
for opt, arg in opts:
if opt in ('-a', '--address'):
address = arg
elif opt in ('-d', '--database'):
database = arg
elif opt in ('-u', '--username'):
username = arg
elif opt in ('-p', '--password'):
password = arg
elif opt in ('-f', '--from'):
cursor = arg
selectors = [ (8)
# {'select': 'n'}
]
with GraphDatabase.driver(address, auth=(username, password)) as driver:
cdc = CDCService(driver, database, cursor, selectors)
cdc_thread = Thread(target=cdc.run, daemon=True)
cdc_thread.start()
cdc_thread.join()
if __name__ == '__main__':
main(sys.argv[1:])
| 1 | This method is called once for each change event. It should be replaced depending on your use case. |
| 2 | This query fetches the changes from the database. |
| 3 | The cursor is moved forward to keep it up-to-date. This may not be necessary in your use case. See Cursor Management for details. |
| 4 | This method is called once for each change. |
| 5 | session.execute_read may retry query_changes_query when there are network issues, for example.
To avoid seeing the same change twice, update the cursor as the changes are applied. |
| 6 | Use this function to get the earliest available change id. |
| 7 | Use this function to get the current change id. |
| 8 | The results may be filtered to return a subset of changes. The out-commented line would select only node changes and exclude all relationship changes. |
| 9 | Call query_changes repeatedly, using the cursor from the previous call. |