Running a search engine with NEO4J and Llamaindex

Microsoft's Graphrag implementation is one of the first and introduces many new features. It includes both index section, where entities, relationships, and hierarchical communities are extracted and summarized, with query time capabilities. This approach enables systems to answer broad, comprehensive questions by including pre-packaged business, relationship, and community summaries, exceeding the limitations of traditional book-reading of standard rag systems.
I've covered the index section and global and local search methods in previous blog posts (here and here), so we'll skip those details in this discussion. However, we haven't yet explored DRIFT search, which will be the focus of this blog post. Drift is a new method that combines the features of both methods and local search methods. This approach starts with public information hanging from Vector Search to establish a broad starting point for finding queries, and then uses this public information to refine the first query into subsequent follow-up queries. This allows diving by importing the information graph to find specific information about structures, relationships, and other local information, the efficiency of the computer with the full quality of the answers.

The implementation uses the Lmamaindex workflow to orchestrate a search process that drives through several key steps. It starts with Precision generationConstructing a hypothetical answer based on a sample public report improves the representation of the question.
This page Social search initiative It then uses vector similarity to identify relevant reports for the community, providing a broader context for the query. The system analyzes these results to generate an initial intermediate response and a set of subsequent questions for deeper investigation.
The following questions are made in parallel with Local search sectionRetrieving directed information includes chunks of text, associations, relationships and additional social reports from the knowledge graph. This process can reach great depth, with each cycle potentially spawning new follow-up questions.
Finally, Response step It synchronizes all the intermediate responses collected throughout the process, combining a broad understanding of the community level with local detailed findings to produce a complete response. This approach measured both breadth and depth, starting broadly with Community Motion and continuing to become more detailed.
This is my implementation of DRITT search, modified by Lmamaindex workflow and NEO4J. I'm backtracking by testing Microsoft's Graphrag code, so there might be differences from the first implementation.
The code is available on GitHub.
Dataset measurement
In this blog tutorial, we will use Alice's Adventures in Wonderland By Lewis Carroll, a classic text freely available from project gutenberg. This rich narrative data with its associated characters, places, and events makes it an excellent choice to showcase the power of grafrag.
Milk
For the import process, we will also use the implementation of the Microsoft Graphrag Indextion Indextion that was not built in the previous blog post, converted to the operation of the LLAINIMDEX function.

The import pipeline follows the standard graphrag method in three main stages:
class MSGraphRAGIngestion(Workflow):
@step
async def entity_extraction(self, ev: StartEvent) -> EntitySummarization:
chunks = splitter.split_text(ev.text)
await ms_graph.extract_nodes_and_rels(chunks, ev.allowed_entities)
return EntitySummarization()
@step
async def entity_summarization(
self, ev: EntitySummarization
) -> CommunitySummarization:
await ms_graph.summarize_nodes_and_rels()
return CommunitySummarization()
@step
async def community_summarization(
self, ev: CommunitySummarization
) -> CommunityEmbeddings:
await ms_graph.summarize_communities()
return CommunityEmbeddings()
Workflow extracts entities and relationships from the text documents, creates summaries of both entities and relationships, and results in Hierarchical community summaries.
After summarizing, we generate vector declarations for both communities and frameworks to enable parallel searches. Here is the community embedding step:
@step
async def community_embeddings(self, ev: CommunityEmbeddings) -> EntityEmbeddings:
# Fetch all communities from the graph database
communities = ms_graph.query(
"""
MATCH (c:__Community__)
WHERE c.summary IS NOT NULL AND c.rating > $min_community_rating
RETURN coalesce(c.title, "") + " " + c.summary AS community_description, c.id AS community_id
""",
params={"min_community_rating": MIN_COMMUNITY_RATING},
)
if communities:
# Generate vector embeddings from community descriptions
response = await client.embeddings.create(
input=[c["community_description"] for c in communities],
model=TEXT_EMBEDDING_MODEL,
)
# Store embeddings in the graph and create vector index
embeds = [
{
"community_id": community["community_id"],
"embedding": embedding.embedding,
}
for community, embedding in zip(communities, response.data)
]
ms_graph.query(
"""UNWIND $data as row
MATCH (c:__Community__ {id: row.community_id})
CALL db.create.setNodeVectorProperty(c, 'embedding', row.embedding)""",
params={"data": embeds},
)
ms_graph.query(
"CREATE VECTOR INDEX community IF NOT EXISTS FOR (c:__Community__) ON c.embedding"
)
return EntityEmbeddings()
The same process is used in Entity Exceedings, to create the vectors of indices needed for matching-based retrieval.
Dragging search
Drift Search is an intuitive method of information retrieval: Start by understanding the big picture, then drill down to where it's needed. Instead of immediately searching for exact matches at the document or entity level, DRIFT first displays community summaries, which are high-level views that capture key themes and topics within a knowledge graph.
As soon as DRIFT identifies relevant high-level information, it effectively generates follow-up queries to find specific information about specific structures, relationships and source documents. This two-stage approach shows how people search for information: We start by focusing on a general overview, and then ask targeted questions to fill in the information. By combining the comprehensive coverage of a global search with the precision of a local search, retrieval achieves both breadth and depth without the expense of processing a report or document.
Let's go through each stage of implementation.
The code is available A certain medicine kiki.
Social Search
Drift uses Hyde (hypothetical Donserm) Instead of prompting the user's question directly, Hyde generates a hypothetical answer first, and then uses that same search. This works because the understandable answers are closer to each other by blurring the actual community summaries than the raw questions.
@step
async def hyde_generation(self, ev: StartEvent) -> CommunitySearch:
# Fetch a random community report to use as a template for HyDE generation
random_community_report = driver.execute_query(
"""
MATCH (c:__Community__)
WHERE c.summary IS NOT NULL
RETURN coalesce(c.title, "") + " " + c.summary AS community_description""",
result_transformer_=lambda r: r.data(),
)
# Generate a hypothetical answer to improve query representation
hyde = HYDE_PROMPT.format(
query=ev.query, template=random_community_report[0]["community_description"]
)
hyde_response = await client.responses.create(
model="gpt-5-mini",
input=[{"role": "user", "content": hyde}],
reasoning={"effort": "low"},
)
return CommunitySearch(query=ev.query, hyde_query=hyde_response.output_text)
Next, we embed a Hyde query and retrieve the top 5 most relevant public reports by Vector similarity. It then prompts the LLM to generate an intermediate response to these reports and identify the following questions for deeper investigation. The middle answer is saved, and all follow-up queries are sent according to the local search category.
@step
async def community_search(self, ctx: Context, ev: CommunitySearch) -> LocalSearch:
# Create embedding from the HyDE-enhanced query
embedding_response = await client.embeddings.create(
input=ev.hyde_query, model=TEXT_EMBEDDING_MODEL
)
embedding = embedding_response.data[0].embedding
# Find top 5 most relevant community reports via vector similarity
community_reports = driver.execute_query(
"""
CALL db.index.vector.queryNodes('community', 5, $embedding) YIELD node, score
RETURN 'community-' + node.id AS source_id, node.summary AS community_summary
""",
result_transformer_=lambda r: r.data(),
embedding=embedding,
)
# Generate initial answer and identify what additional info is needed
initial_prompt = DRIFT_PRIMER_PROMPT.format(
query=ev.query, community_reports=community_reports
)
initial_response = await client.responses.create(
model="gpt-5-mini",
input=[{"role": "user", "content": initial_prompt}],
reasoning={"effort": "low"},
)
response_json = json_repair.loads(initial_response.output_text)
print(f"Initial intermediate response: {response_json['intermediate_answer']}")
# Store the initial answer and prepare for parallel local searches
async with ctx.store.edit_state() as ctx_state:
ctx_state["intermediate_answers"] = [
{
"intermediate_answer": response_json["intermediate_answer"],
"score": response_json["score"],
}
]
ctx_state["local_search_num"] = len(response_json["follow_up_queries"])
# Dispatch follow-up queries to run in parallel
for local_query in response_json["follow_up_queries"]:
ctx.send_event(LocalSearch(query=ev.query, local_query=local_query))
return None
This establishes the basic approach of DRIFT: Start broadly with Hyde's advanced social search, then drill down with subsequent follow-up questions.
Local Search
The local search section brings up the following queries as well as digging down specific details. Each query retrieves the context directed by the entity-based vector search, and then performs an intermediate response and subsequent queries.
@step(num_workers=5)
async def local_search(self, ev: LocalSearch) -> LocalSearchResults:
print(f"Running local query: {ev.local_query}")
# Create embedding for the local query
response = await client.embeddings.create(
input=ev.local_query, model=TEXT_EMBEDDING_MODEL
)
embedding = response.data[0].embedding
# Retrieve relevant entities and gather their associated context:
# - Text chunks where entities are mentioned
# - Community reports the entities belong to
# - Relationships between the retrieved entities
# - Entity descriptions
local_reports = driver.execute_query(
"""
CALL db.index.vector.queryNodes('entity', 5, $embedding) YIELD node, score
WITH collect(node) AS nodes
WITH
collect {
UNWIND nodes as n
MATCH (n)<-[:MENTIONS]->(c:__Chunk__)
WITH c, count(distinct n) as freq
RETURN {chunkText: c.text, source_id: 'chunk-' + c.id}
ORDER BY freq DESC
LIMIT 3
} AS text_mapping,
collect {
UNWIND nodes as n
MATCH (n)-[:IN_COMMUNITY*]->(c:__Community__)
WHERE c.summary IS NOT NULL
WITH c, c.rating as rank
RETURN {summary: c.summary, source_id: 'community-' + c.id}
ORDER BY rank DESC
LIMIT 3
} AS report_mapping,
collect {
UNWIND nodes as n
MATCH (n)-[r:SUMMARIZED_RELATIONSHIP]-(m)
WHERE m IN nodes
RETURN {descriptionText: r.summary, source_id: 'relationship-' + n.name + '-' + m.name}
LIMIT 3
} as insideRels,
collect {
UNWIND nodes as n
RETURN {descriptionText: n.summary, source_id: 'node-' + n.name}
} as entities
RETURN {Chunks: text_mapping, Reports: report_mapping,
Relationships: insideRels,
Entities: entities} AS output
""",
result_transformer_=lambda r: r.data(),
embedding=embedding,
)
# Generate answer based on the retrieved context
local_prompt = DRIFT_LOCAL_SYSTEM_PROMPT.format(
response_type=DEFAULT_RESPONSE_TYPE,
context_data=local_reports,
global_query=ev.query,
)
local_response = await client.responses.create(
model="gpt-5-mini",
input=[{"role": "user", "content": local_prompt}],
reasoning={"effort": "low"},
)
response_json = json_repair.loads(local_response.output_text)
# Limit follow-up queries to prevent exponential growth
response_json["follow_up_queries"] = response_json["follow_up_queries"][:LOCAL_TOP_K]
return LocalSearchResults(results=response_json, query=ev.query)
The next step is for the old reinforcement process. It expects all similar searches to complete using collect_eventsthen decide whether to continue digging down. If the current depth does not reach the maximum (We use Max depth = 2), it removes the following questions from all the results, keeps the intermediate answers, and moves the next cycle of the same search.
@step
async def local_search_results(
self, ctx: Context, ev: LocalSearchResults
) -> LocalSearch | FinalAnswer:
local_search_num = await ctx.store.get("local_search_num")
# Wait for all parallel searches to complete
results = ctx.collect_events(ev, [LocalSearchResults] * local_search_num)
if results is None:
return None
intermediate_results = [
{
"intermediate_answer": event.results["response"],
"score": event.results["score"],
}
for event in results
]
current_depth = await ctx.store.get("local_search_depth", default=1)
query = [ev.query for ev in results][0]
# Continue drilling down if we haven't reached max depth
if current_depth < MAX_LOCAL_SEARCH_DEPTH:
await ctx.store.set("local_search_depth", current_depth + 1)
follow_up_queries = [
query
for event in results
for query in event.results["follow_up_queries"]
]
# Store intermediate answers and dispatch next round of searches
async with ctx.store.edit_state() as ctx_state:
ctx_state["intermediate_answers"].extend(intermediate_results)
ctx_state["local_search_num"] = len(follow_up_queries)
for local_query in follow_up_queries:
ctx.send_event(LocalSearch(query=query, local_query=local_query))
return None
else:
return FinalAnswer(query=query)
This creates an iterative refinement loop where each level builds on previous findings. As soon as the Max depth is reached, it triggers the final response.
Final Answer
The last step combines all the intermediate answers collected from the whole driving search process into a complete answer. This includes the first response from a social search and all responses generated during a local search.
@step
async def final_answer_generation(self, ctx: Context, ev: FinalAnswer) -> StopEvent:
# Retrieve all intermediate answers collected throughout the search process
intermediate_answers = await ctx.store.get("intermediate_answers")
# Synthesize all findings into a comprehensive final response
answer_prompt = DRIFT_REDUCE_PROMPT.format(
response_type=DEFAULT_RESPONSE_TYPE,
context_data=intermediate_answers,
global_query=ev.query,
)
answer_response = await client.responses.create(
model="gpt-5-mini",
input=[
{"role": "developer", "content": answer_prompt},
{"role": "user", "content": ev.query},
],
reasoning={"effort": "low"},
)
return StopEvent(result=answer_response.output_text)
To put it briefly
Driving search presents a fun strategy for balancing global search scope with local search accuracy. By starting with community-level context and continuing with follow-up questions, it avoids the burden of processing all community reports while still maintaining comprehensive coverage.
However, there is room for several improvements. The current implementation treats all intermediate responses equally, but filtering based on their confidence schools can improve the final response quality and reduce noise. Similarly, follow-up questions can be calculated based on relevant or potentially profitable information before execution, to ensure that the most promising are pursued first.
Another promising mission will introduce a query refinement step that uses LLM to analyze all generated follow-up queries, the same group to avoid unwanted searches and filter unwanted queries. This can significantly reduce the number of local searches while maintaining the quality of the response.
Full implementation is available A certain medicine kiki For those who are interested in experimenting with these enhancements or adapting to changing situations in their cases.



