Natural Language to Database Query with LangGraph

Natural Language to Database Query with LangGraph

Natural language to database query is an integral part of Retrieval Augmented Generation (RAG).

It is key that the database query generated is both syntatically and semantically correct. The syntax must be correct, and the results of the query must answer the question.

LangChain’s Neo4j integration provides a GraphCypherQAChain for Question/Answer generation based on a user input, but the challenge is this is zero-shot; given a database schema, the chain will attempt to generate a Cypher statement, execute it and use that to generate an answer but there are no guardrails in place to assert that query is be correct, and the chain will simply fail if the stynax is incorrect.

In the Build a Neo4j-backed Chatbot with TypeScript course, I attempted to address this problem by introducing a while loop. It did the job, but somehow, this felt clunky.

Enter LangGraph

Since publishing the course, LangChain have released LangGraph. While LCEL and chains allow for linear tasks to be executed, LangGraph allows developers to build cyclic workflows where steps can be skipped or revisited.

LangGraph workflows consist of Nodes and Edges (not to be confused with Nodes and Relationships in a Graph database).

A Node is a function that accepts the current state of the graph and returns a partial updated view of that state. Each node is defined with a unique identifier.

Edges are responsible for deciding which action should be taken next, and come in two forms: normal or conditional. Normal edges are used to specify a next step, while conditional edges are functions that calculate and return the next node based on the current state.

NL2*QL

This functionality is ideal for a Natural Language to Database Query task, which requires the following steps:

Flow Chart containing steps of a LangChain application for Q&A on a Neo4j database.

  1. Use the database schema to instruct an LLM to generate a database query based on the user’s input.
  2. Use an LLM to validate that Cypher query to ensure that it:
    1. contains the correct syntax, and
    2. will result in data that answers the users question.
  3. If validation fails, pass this feedback to the LLM along with a prompt to correct the statement, then repeat step 2.
  4. Once the query has passed the validation stage, it should be run against the database.
    1. If the database returns an error, correct the cypher once again.
  5. If there are no results, use an LLM to generate an “I don’t know” response to avoid hallucinations
  6. If results were found in the database, create an LLM generated response.
  7. Once an answer has been generated, check it for a hallucination. If the answer does not relate to the data, generate another answer. If the data does not provide an answer, re-generate the Cypher. Otherwise, if the generated answer is good, end the workflow.

This task has the potential to jump back and forth between steps, exactly what LangGraph is designed for.

Installation

LangGraph exists as its own dependency under the @langchain namespace. In this example, I will also use OpenAI (@langchain/openai) as I have found that it works well when writing Cypher.

npm install langchain @langchain/langgraph @langchain/openai

Graph State

The first step is to define the state of the graph.

// src/modules/cypher-qa/state.ts
export type CypherQAState = {
  // Original user input
  input: string;

  // The database schema from Neo4jGraph in @langchain-community
  schema: string;

  // The last generated Cypher statement
  cypher: string | undefined;

  // For any errors either during generation or execution
  cypherErrors: string[];

  // The results of the Cypher Statement
  results: Record<string, any>[];

  // The final output
  output: string;

  // A log of steps for debugging purposes
  log: string[];

  // Guardrails to make sure the app doesn't go into an infinite loop
  cypherAttempts: number;
  maxCypherAttempts: number;
  answerAttempts: number;
  maxAnswerAttempts: number;
};

Next, an object needs to be defined with a set of reducers which instruct LangGraph on how to handle the updated state returned by each node. The state can either be overwritten completely, or the values returned by a Node appended to the state. I created two helper functions for these scenarios.

// src/modules/cypher-qa/state.ts
function append<T>(initialValue: T[] = []) {
  return {
    reducer: (a: T[], b: T[]) => a.concat(b),
    default: () => initialValue,
  };
}

function overwrite<T>(initialValue: T) {
  return {
    reducer: (a: T, b: T): T => b || a,
    default: () => initialValue,
  };
}

You can also specify the reducer as null to overwrite the values. I find this useful to differentiate items that will only be set once throughout the duration of the workflow.

// src/modules/cypher-qa/state.ts
const channels: StateGraphArgs<CypherQAState>["channels"] = {
  input: null,
  cypher: null,
  schema: null,
  maxAnswerAttempts: overwrite<number>(5),
  maxCypherAttempts: overwrite<number>(5),
  cypherAttempts: overwrite<number>(0),
  cypherErrors: null,
  answerAttempts: overwrite<number>(0),
  results: null,
  output: null,
  elementIds: null,
  log: append<string>(),
};

Workflow

I found it easiest to start by defining the workflow and then build out the functionality. For each of the node names, I also prefer to use constants exported from the files containing the node functionality to specify the node’s identifier.

// src/modules/cypher-qa/index.ts
export default async function initCypherQA() {
  const workflow = new StateGraph<CypherQAState>({
    channels,
  })
    .addNode(NODE_GENERATE_CYPHER, generateCypher)
    .addNode(NODE_VALIDATE_CYPHER, validateCypher)
    .addNode(NODE_CORRECT_CYPHER, correctCypher)
    .addNode(NODE_RUN_CYPHER, runCypher)
    .addNode(NODE_GENERATE_ANSWER, generateAnswer)
    .addNode(NODE_GENERATE_NO_ANSWER, generateNoAnswer)

    // 1. Start by generating a statement
    .addEdge(START, NODE_GENERATE_CYPHER)

    // 2. Then validate the Cypher
    .addEdge(NODE_GENERATE_CYPHER, NODE_VALIDATE_CYPHER)

    // 3. Correct or 4. run cypher
    .addConditionalEdges(NODE_VALIDATE_CYPHER, shouldCorrectCypher)

    // After correction, validate Cypher
    .addEdge(NODE_CORRECT_CYPHER, NODE_VALIDATE_CYPHER)

    // 4. Generate answer or "I don't know"
    .addConditionalEdges(NODE_RUN_CYPHER, checkResultHasRows)

    // 5. Check answer for hallucination, if not then END
    .addConditionalEdges(NODE_GENERATE_ANSWER, detectHallucination)

    // 6. Return no answer
    .addEdge(NODE_GENERATE_NO_ANSWER, END);

  return workflow.compile();
}

Nodes

Each node is passed a copy of the state and returns a partial copy of the state. The node implementations are regular or async functions. You can use these to call external APIs, but in this case I need to trigger individual LCEL (LangChain Expression Language) chains that take a subset of the state, format the values within a prompt, pass the element to the LLM, and then append the result to the state.

Deep breath… let’s take these one by one.

generateCypher

Cypher Generation requires the database schema. This is already available in langchain by calling the getSchema() method on the Neo4jGraph class, so no need to reinvent the wheel.

// src/modules/graph.ts
import { Neo4jGraph } from "@langchain/community/graphs/neo4j_graph";

let graph: Neo4jGraph;

export async function initGraph(): Promise<Neo4jGraph> {
  if (!graph) {
    // Create singleton and wait for connection to be verified
    graph = await Neo4jGraph.initialize({
      url: process.env.NEO4J_URI as string,
      username: process.env.NEO4J_USERNAME as string,
      password: process.env.NEO4J_PASSWORD as string,
      database: process.env.NEO4J_DATABASE as string | undefined,
      enhancedSchema: true,
    });
  }

  return graph;
}

The initGraph function provides access to a singleton instance of the Neo4jGraph class that can be used across the application without creating unnecessary database connections.

This can be used inside the generateCypher() function to obtain the database schema as a string, in a format that the LLM will be able to parse.

// src/modules/cypher-qa/nodes/generate-cypher.ts
export const generateCypher = async (data: CypherQAState): Promise<Partial<CypherQAState>>  => {
  const graph = await initGraph();
  const schema = await graph.getSchema();

  // ...
}

Next, the prompt. The LLM is given the task of generating a Cypher statement.

// src/modules/cypher-qa/nodes/generate-cypher.ts
const prompt = PromptTemplate.fromTemplate(`
Task: Generate Cypher statement to query a graph database.

Instructions:
* Use only the provided relationship types and properties in the schema.
* Do not use any other relationship types or properties that are not provided.
* Return a verbose set of properties for each node to provide as much information as possible.

Schema:
{schema}

Note: Do not include any explanations or apologies in your responses.

Do not respond to any questions that might ask anything else than for you to construct a Cypher statement.

Do not include any text except the generated Cypher statement.
Do not return any markdown.

The question is:
{input}

{format_instructions}
`);

You may have noticed the {format_instructions} placeholder in the prompt.

Depending on the model and versions used, the LLM may return wildly different responses. The GraphCypherQAChain contains steps to extract any Cypher statement from a markdown response using regular expression, but this can be inconsistent. I have found that specifying the output in a StructuredOutputParser is far more consistent.

You can use this to trigger Chain-of-thought, but in this case I have only specified a single property on an object, cypher.

// src/modules/cypher-qa/nodes/generate-cypher.ts
const parser = StructuredOutputParser.fromZodSchema(
  z.object({
    cypher: z.string().describe("The Cypher statement"),
  })
);

GPT 3.5 Turbo does a good job of generating Cypher and has been tested extensively.

// src/modules/cypher-qa/nodes/generate-cypher.ts
const llm = new ChatOpenAI({ model: "gpt-3.5-turbo" });

Next, combine the steps in a chain.

// src/modules/cypher-qa/nodes/generate-cypher.ts
const chain = RunnableSequence.from([prompt, llm, parser]);

The parser will ensure the output of the chain is an object, from which the cypher value can be assigned using destructuring assignment.

// src/modules/cypher-qa/nodes/generate-cypher.ts
const { cypher } = await chain.invoke({
  input: data.input,
  schema,
  format_instructions: parser.getFormatInstructions(),
});

The generated Cypher statement can then be appended to the graph state. At this point, the number of Cypher generation attempts should be set to 1 and the errors should be an empty array.

// src/modules/cypher-qa/nodes/generate-cypher.ts
return {
  log: [`${NODE_GENERATE_CYPHER}: fresh Cypher: ${cypher}`],
  cypher,
  schema,
  cypherErrors: [],
  cypherAttempts: 1,
};
View the full generateCypher() method
// src/modules/cypher-qa/nodes/generate-cypher.ts
export const generateCypher = async (data: CypherQAState): Promise<Partial<CypherQAState>> => {
  const graph = await initGraph();
  const schema = await graph.getSchema();

  const prompt = PromptTemplate.fromTemplate(`
Task: Generate Cypher statement to query a graph database.

Instructions:
* Use only the provided relationship types and properties in the schema.
* Do not use any other relationship types or properties that are not provided.
* Return a verbose set of properties for each node to provide as much information as possible.

Schema:
{schema}

Note: Do not include any explanations or apologies in your responses.

Do not respond to any questions that might ask anything else than for you to construct a Cypher statement.

Do not include any text except the generated Cypher statement.
Do not return any markdown.

The question is:
{input}

{format_instructions}
  `);
  const llm = new ChatOpenAI({ model: "gpt-3.5-turbo" });
  const parser = StructuredOutputParser.fromZodSchema(
    z.object({
      cypher: z.string().describe("The Cypher statement"),
    })
  );

  const chain = RunnableSequence.from([prompt, llm, parser]);

  const { cypher } = await chain.invoke({
    input: data.input,
    schema,
    format_instructions: parser.getFormatInstructions(),
  });

  return {
    log: [`${NODE_GENERATE_CYPHER}: fresh Cypher: ${cypher}`],
    cypher,
    schema,
    cypherErrors: [],
    cypherAttempts: 1,
  };
};

validateCypher

From here, I will gloss over the implementation details as they all follow the same general steps of passing a prompt to an LLM, parsing the response and returning values to be appended to the state.

View the full validateCypher() method
// src/modules/cypher-qa/nodes/validate-cypher.ts
export const validateCypher = async (data: CypherQAState) => {
  const graph = await initGraph();
  const schema = await graph.getSchema();

  const prompt = PromptTemplate.fromTemplate(`
You are a Cypher expert reviewing a statement written by a junior developer.
You must check the following:
* Are there any syntax errors in the Cypher statement?
* Are there any missing or undefined variables in the Cypher statement?
* Are any node labels missing from the schema?
* Are any relationship types missing from the schema?
* Are any of the properties not included in the schema?
* Does the Cypher statement include enough information to answer the question?

Schema:
{schema}

{format_instructions}

The question is:
{input}

The Cypher statement is:
{cypher}

Examples of good errors:
* Label (:Foo) does not exist, did you mean (:Bar)?
* Property bar does not exist for label Foo, did you mean baz?
* Relationship FOO does not exist, did you mean FOO_BAR?
  `);

  const llm = new ChatOpenAI({ model: "gpt-3.5-turbo" });
  const parser = StructuredOutputParser.fromZodSchema(
    z.object({
      errors: z
        .string()
        .array()
        .optional()
        .describe(
          "Each individual error identified in the Cypher statement.  Always include the correct value where possible"
        ),
    })
  );

  const chain = RunnableSequence.from([prompt, llm, parser]);

  const { errors } = await chain.invoke({
    format_instructions: parser.getFormatInstructions(),
    schema,
    cypher: data.cypher as string,
    input: data.input,
  });

  return {
    log: [
      `${NODE_VALIDATE_CYPHER}: #${data.cypherAttempts} Validated Cypher, \\${
        data.cypher
      }\\ got [${errors ? errors.join(", ") : "none"}]`,
    ],
    cypherErrors: errors ?? [],
  };
};

correctCypher

View the full correctCypher() method
// src/modules/cypher-qa/nodes/correct-cypher.ts
export const correctCypher = async (data: CypherQAState) => {

  const graph = await initGraph();
  const schema = await graph.getSchema();

  const prompt = PromptTemplate.fromTemplate(`
You are a Cypher expert reviewing a statement written by a junior developer.
Check for missing properties in the schema and return a corrected Cypher statement.

Schema:
{schema}

Note: Do not include any explanations or apologies in your responses.

Do not respond to any questions that might ask anything else than for you to construct a Cypher statement.

{format_instructions}

The question is:
{input}

The Cypher statement is:
{cypher}

${
  data.cypherErrors?.length
    ? "The statement has the following errors:\n{errors}"
    : ""
}
`);

  const llm = new ChatOpenAI({ model: "gpt-3.5-turbo" });
  const parser = StructuredOutputParser.fromZodSchema(
    z.object({
      cypher: z.string().describe("The corrected Cypher statement"),
    })
  );

  const chain = RunnableSequence.from([prompt, llm, parser]);

  const { cypher } = await chain.invoke({
    input: data.input,
    cypher: data.cypher as string,
    schema,
    format_instructions: parser.getFormatInstructions(),
    errors: "*" + data.cypherErrors.join("\n* "),
  });

  return {
    log: [`${NODE_CORRECT_CYPHER}: fresh Cypher: ${cypher}`],
    cypher,
    cypherErrors: [],
    cypherAttempts: data.cypherAttempts + 1,
  };
};

runCypher

The runCypher function has a try/catch block. If a Neo4jError is thrown, for example in the case of a Syntax error, the cypherAttempts value is increased and the error message returned.

View the full runCypher() method
// src/modules/cypher-qa/nodes/run-cypher.ts
export const runCypher = async (
  data: CypherQAState
): Promise<Partial<CypherQAState>> => {
  const graph = await initGraph();

  try {
    const res = await graph.query(data.cypher as string);

    return {
      log: [
        `${NODE_RUN_CYPHER}: got ${res.length} rows: ${JSON.stringify(res)}`,
      ],
      cypherErrors: [],
      cypherAttempts: data.cypherAttempts ? data.cypherAttempts + 1 : 1,
      results: res,
      elementIds: res.map((row) => row._id).filter((n) => !!n),
    };
  } catch (e: any) {
    if (e instanceof Neo4jError) {
      // Try again in the case of a transient error
      if (e.code.startsWith("Neo.TransientError")) {
        return await runCypher(data);
      }

      return {
        log: [`${NODE_RUN_CYPHER}: ${e.code}: ${e.message}`],
        cypherAttempts: data.cypherAttempts ? data.cypherAttempts + 1 : 1,
        results: undefined,
        cypherErrors: [`${e.code}: ${e.message}`],
      };
    }

    return {
      log: [`${NODE_RUN_CYPHER}: never`],
      results: undefined,
      cypherAttempts: data.cypherAttempts ? data.cypherAttempts + 1 : 1,
      cypherErrors: ["never"],
    };
  }
};

The subsequent conditional edge will check to see if cypherErrors contains one or more errors, and if so the edge will ensure that the correctCypher node is re-run. But more on that later.

generateAnswer

View the full generateAnswer() method
// src/modules/cypher-qa/nodes/generate-answer.ts
export const generateAnswer = async (data: CypherQAState) => {
  // return {
  //   log: ["Has answer in data, generated output"],
  //   answerAttempts: data.answerAttempts + 1,
  //   output: "I know: the answer is Adam",
  //   nodeIds: ["1", "2"],
  // };
  const prompt = PromptTemplate.fromTemplate(`
    Use the following results retrieved from a database to provide
    a succinct, definitive answer to the user's question.
    If no results are provided say you don't know.

    Respond as if you are answering the question directly.

    Results: {results}
    Question: {input}
  `);
  const llm = new ChatOpenAI({
    model: "gpt-4o",
  });

  const chain = RunnableSequence.from([prompt, llm, new StringOutputParser()]);

  const output = await chain.invoke({
    input: data.input,
    results: JSON.stringify(data.results),
  });

  return {
    answerAttempts: data.answerAttempts + 1,
    output,
  };
};

generateNoAnswer

View the full generateNoAnswer() method

In this case, I have returned a hardcoded I don't know response but you could use an LLM to generate a more friendly response.

// src/modules/cypher-qa/nodes/generate-answer.ts
export const generateNoAnswer = async (data: CypherQAState) => {
  // Save an LLM call and hardcode a response
  return {
    log: ["No answers in data, telling the user I dont know"],
    output: "I don't know",
  };
};

Edges

If you take a look at the edges defined below, you’ll see a number of calls to the addEdge() function. These are normal edges, where the next step will always follow. For example:

  • After generating a Cypher statement, the next step will always be to validate it.
  • After correcting a Cypher statement, it should also be validated.
// src/modules/cypher-qa/index.ts
const workflow = new StateGraph<CypherQAState>({
    channels,
  })

    // ...

    // 1. Start by generating a statement
    .addEdge(START, NODE_GENERATE_CYPHER)

    // 2. Then validate the Cypher
    .addEdge(NODE_GENERATE_CYPHER, NODE_VALIDATE_CYPHER)

    // 3. Correct or 4. run cypher
    .addConditionalEdges(NODE_VALIDATE_CYPHER, shouldCorrectCypher)

    // After correction, validate Cypher
    .addEdge(NODE_CORRECT_CYPHER, NODE_VALIDATE_CYPHER)

    // 4. Generate answer or "I don't know"
    .addConditionalEdges(NODE_RUN_CYPHER, checkResultHasRows)

    // 5. Check answer for hallucination, if not then END
    .addConditionalEdges(NODE_GENERATE_ANSWER, detectHallucination)

    // 6. Return no answer
    .addEdge(NODE_GENERATE_NO_ANSWER, END);

Two important edges are also defined, START and END. The START node will be where the workflow starts, and END will be the point where the final state is returned.

Conditional Edges

Conditional Edges return the unique identifier for node that should be followed next. There are three conditional edges in this workflow.

shouldCorrectCypher

The shouldCorrectCypher edge checks the cypherErrors array generated in the validateCypher node. If the node returned one or more errors, the Cypher should be corrected. Otherwise, the workflow should continue to the next stage, running the Cypher statement.

// src/modules/cypher-qa/edges/should-correct-cypher.ts
export const shouldCorrectCypher = async (
  data: CypherQAState
): Promise<typeof NODE_CORRECT_CYPHER | typeof NODE_RUN_CYPHER> => {
  return data.cypherErrors.length > 0 ? NODE_CORRECT_CYPHER : NODE_RUN_CYPHER;
};

checkResultHasRows

Once the Cypher statement has been run, the checkResultHasRows edge checks for Cypher errors detected in the try/catch block to determine whether the Cypher needs to be corrected. This step also includes the guardrail to not exceed the maximum number of Cypher generation attempts.

If the Cypher is successfully run but no results are returned, an “I don’t know” answer should be generated. Otherwise, if the query returned results, an LLM should generate a human-readable result.

// src/modules/cypher-qa/edges/check-rows.ts
export const checkResultHasRows = async (
  data: CypherQAState
): Promise<
  | typeof NODE_CORRECT_CYPHER
  | typeof NODE_GENERATE_NO_ANSWER
  | typeof NODE_GENERATE_ANSWER
> => {
  // If syntax errors, correct the cypher
  if (data.cypherErrors.length) {
    // Failed to generate Cypher within the attempt limit,
    // just give up
    if (data.cypherAttempts >= data.maxCypherAttempts) {
      return NODE_GENERATE_NO_ANSWER;
    }

    return NODE_CORRECT_CYPHER;
  }

  return data.results && data.results.length > 0
    ? NODE_GENERATE_ANSWER
    : NODE_GENERATE_NO_ANSWER;
};

detectHallucination

Once the answer has been generated, an LLM should determine whether the generated response has been grounded with the data returned by the Cypher statement. This guards against hallucinations by the LLM.

// src/modules/cypher-qa/edges/detect-hallucination.ts
export const detectHallucination = async (
  data: CypherQAState
): Promise<
  typeof NODE_GENERATE_CYPHER | typeof NODE_GENERATE_NO_ANSWER | typeof END
> => {
  // Detect Hallucinations
  const prompt = PromptTemplate.fromTemplate(`
    You are judging an LLM response generated by a rival LLM using Retrieval Augmented Generation
    based on the output of a database query.

    Follow the rules below to come to your conclusion:

    * If the LLM has generated a response that does not relate to the data, respond with "${NODE_GENERATE_ANSWER}".
    * If the data provided does not answer the question, respond with "${NODE_GENERATE_CYPHER}".
    * If the response answers the question, reply with "${END}".

    Question:
    {question}

    Database Results:
    {results}

    Generated Response:
    {output}

    {format_instructions}
  `);

  const llm = new ChatOpenAI({
    openAIApiKey: process.env.OPENAI_API_KEY,
  });
  const parser = StructuredOutputParser.fromZodSchema(
    z.enum([NODE_GENERATE_CYPHER, NODE_GENERATE_NO_ANSWER, END])
  );

  const chain = prompt.pipe(llm).pipe(parser);

  return chain.invoke({
    question: data.input,
    results: JSON.stringify(data.results),
    output: data.output,
    format_instructions: parser.getFormatInstructions(),
  });
};

Compile & Invoke

The last step is to call the compile() method to create a invoke -able object.

return workflow.compile();

As I have wrapped the workflow in a initCypherQA() function, I can call the invoke() method to execute the workflow and generate a result.

// src/index.ts
const workflow = await initCypherQA();

const res = await qa.invoke(
  {
    input: "How many movies has Emil Eifrem acted in?",
    maxCypherAttempts: 5,
    maxAnswerAttempts: 3,
  },
); // Returns an object of type CypherQAState

console.log(res.output) // Emil Eifrem has acted in 1 movie.

Conclusion

I have a tendency to jump straight into code, but I found that solidifying the logic in a flowchart first helped to clarify my thinking. The flowchart was a helpful resource when I found myself getting lost through the process.

Using unit tests for each node and edge also helped to fine-tune each step in the process and test for common use cases.

You can view the working code on Github.


To learn about how GraphRAG improves LLM responses
check out the Generative AI learning path on GraphAcademy.