Example CDC usage in JavaScriptAuraDB Virtual Dedicated CloudEnterprise Edition
See Monitoring and addressing CDC client performance for throughput guidance when extending this example.
const neo4j = require('neo4j-driver');
const yargs = require('yargs/yargs');
const {hideBin} = require('yargs/helpers');
const argv = yargs(hideBin(process.argv)).argv;
const util = require('util');
function applyChange(change) { (1)
console.log(util.inspect(change.toObject(), false, null, true));
}
async function queryChanges(driver, database, cursor, selectors) {
const session = driver.session({database: database});
try {
const current = currentChangeId(driver, database);
await session.executeRead((tx) => {
return tx.run('CALL db.cdc.query($from, $selectors)',
{from: cursor, selectors: selectors}); (2)
})
.then((res) => {
if (res.records.length === 0) {
cursor = current; (3)
} else {
res.records.map((change) => {
applyChange(change); (4)
cursor = change.get('id'); (5)
});
}
});
} catch (e) {
console.log('Failed to apply change', e);
} finally {
session.close();
}
return cursor;
}
function queryChangesLoop(driver, database, cursor, selectors) {
setTimeout(async () => {
cursor = await queryChanges(driver, database, cursor, selectors);
queryChangesLoop(driver, database, cursor, selectors);
}, 500);
}
function earliestChangeId(driver, database) { (6)
return driver.executeQuery('CALL db.cdc.earliest', {}, {database: database})
.then((res) => {
return res.records[0].get('id');
});
}
function currentChangeId(driver, database) { (7)
return driver.executeQuery('CALL db.cdc.current', {}, {'database': database})
.then((res) => {
return res.records[0].get('id');
});
}
async function main() {
const uri = argv.address ?? argv.a ?? 'neo4j://localhost:7687';
const database = argv.database ?? argv.d ?? 'neo4j';
const user = argv.user ?? argv.u ?? 'neo4j';
const password = argv.password ?? argv.p ?? 'passw0rd';
let cursor = argv.from ?? argv.f;
const driver = neo4j.driver(uri, neo4j.auth.basic(user, password));
if (!cursor) {
cursor = await currentChangeId(driver, database);
}
const selectors = [ (8)
// {"select":"n"}
];
queryChangesLoop(driver, database, cursor, selectors); (9)
// await driver.close() (10)
}
main();
| 1 | This method is called once for each change. |
| 2 | This query fetches the changes from the database. |
| 3 | The cursor is moved forward to keep it up-to-date. This may not be necessary in your use case. See Cursor Management for details. |
| 4 | A function is called once for each change. |
| 5 | Note that the cursor defined outside the anonymous function is updated.
session.executeRead might re-try the query, re-running the inner anonymous function.
To avoid seeing the same change twice, update the cursor as the changes are applied. |
| 6 | Use this function to get the earliest available change id. |
| 7 | Use this function to get the current change id. |
| 8 | The results may be filtered to return a subset of changes. The out-commented line would select only node changes and exclude all relationship changes. |
| 9 | Call queryChanges repeatedly, using the cursor from the previous call. |
| 10 | Call driver.close() when the loop should be terminated. |