Intelligent Semantic Search with Document Deduplication

This blog post introduces a robust document processing pipeline that implements semantic vector search with advanced duplicate prevention mechanisms. Built on the LangChain framework, the system efficiently processes multiple document formats (PDF, TXT, Markdown, DOCX), converts content into meaningful vector embeddings using Ollama’s embedding models, and stores them in a Milvus vector database. The implementation’s standout feature is its deduplication strategy, which uses SHA-256 file hashing to identify and skip already-processed documents at the binary level, regardless of filename.

Preventing duplicate entries in a vector database is important for several reasons:

  1. Resource Efficiency: Vector databases consume storage space and computational resources proportional to the number of entries. Duplicates waste these resources unnecessarily.
  2. Search Quality: Duplicates can skew search results by over-representing certain content, causing redundant information to dominate query responses.
  3. Processing Overhead: Re-processing documents that have already been embedded requires computational time and resources that could be better used elsewhere.
  4. Consistency: Without deduplication, if the same document is processed multiple times (perhaps with minor differences), the database will contain multiple versions, potentially leading to inconsistent search results.
  5. Cost Optimization: For cloud-based vector databases that charge based on storage and query volume, eliminating duplicates directly reduces operational costs.

Install the Necessary Libraries

!pip install langchain langchain-ollama langchain-milvus langchain-community pymupdf unstructured python-docx pymilvus markdown

This command installs multiple Python packages required for the document processing application. It includes langchain (the core framework for building LLM applications), langchain-ollama (for local embedding models), langchain-milvus and pymilvus (for vector database storage), langchain-community (with document loaders), pymupdf (for PDF processing), unstructured (for document parsing), python-docx (for Word document support), and markdown (for Markdown file handling). These packages together create a pipeline that can load various document types, split them into chunks, generate embeddings, and store them in a Milvus vector database for semantic search functionality, as demonstrated in the notebook.

Imports

import os
import hashlib
from langchain_community.embeddings import OllamaEmbeddings
from langchain_milvus import Milvus
from langchain_community.document_loaders import (
    TextLoader,
    PyMuPDFLoader,
    UnstructuredMarkdownLoader,
    UnstructuredWordDocumentLoader,
)
from langchain.text_splitter import RecursiveCharacterTextSplitter

This code section imports the necessary libraries and modules for the document processing pipeline:

  1. Core Python Libraries:
    • os: Provides operating system functionality for file and directory operations
    • hashlib: Implements secure hash algorithms (used for file deduplication via SHA-256)
  2. LangChain Embedding Model:
    • OllamaEmbeddings: Allows using local Ollama models to generate vector embeddings for text
  3. Vector Database:
    • Milvus: Vector database interface for storing and retrieving document embeddings
  4. Document Loaders:
    • TextLoader: Handles plain text (.txt) files
    • PyMuPDFLoader: Processes PDF documents
    • UnstructuredMarkdownLoader: Parses Markdown (.md) files
    • UnstructuredWordDocumentLoader: Manages Word (.docx) documents
  5. Text Processing:
    • RecursiveCharacterTextSplitter: Splits documents into smaller chunks for more effective embedding and retrieval

Together, these imports form the foundation for a pipeline that can load various document types, split them into manageable chunks, convert them to vector embeddings, and store them in a Milvus vector database with deduplication support.

Configuration and Setup

# Configuration
FOLDER_PATH = "./documents"
COLLECTION_NAME = "semantic_search_collection"
URI = "./multi-file.db"

embeddings = OllamaEmbeddings(model="nomic-embed-text")

vector_store = Milvus(
    embedding_function=embeddings,
    connection_args={"uri": URI},
    collection_name=COLLECTION_NAME,
    auto_id=True,
    index_params={"metric_type": "COSINE"},
)

This code section sets up the foundational components for the document processing system:

  1. Configuration Variables:
    • FOLDER_PATH = "./documents": Defines the directory where the system will look for documents to process
    • COLLECTION_NAME = "semantic_search_collection": Names the collection within the Milvus vector database
    • URI = "./multi-file.db": Specifies the database file path (using a local file-based database)
  2. Embedding Model Setup:
    • embeddings = OllamaEmbeddings(model="nomic-embed-text"): Creates an embedding model instance using Ollama with the “nomic-embed-text” model, which will convert text into numerical vector representations
  3. Vector Database Initialization:
    • vector_store = Milvus(...): Creates and configures the Milvus vector database with:
      • embedding_function=embeddings: Uses the defined embedding model to convert documents to vectors
      • connection_args={"uri": URI}: Connects to the database at the specified location
      • collection_name=COLLECTION_NAME: Sets the collection name within the database
      • auto_id=True: Automatically generates IDs for stored vectors
      • index_params={"metric_type": "COSINE"}: Configures the system to use cosine similarity for measuring how closely vectors match

This setup prepares the system to read documents, convert them to vector embeddings, and store them in a searchable database.

File Hashing Function

The following hash_file function is a key component of the document deduplication system:

def hash_file(file_path: str) -> str:
    """Generate SHA-256 hash of file contents"""
    sha256 = hashlib.sha256()
    with open(file_path, "rb") as f:
        while chunk := f.read(4096):
            sha256.update(chunk)
    return sha256.hexdigest()

The function:

  1. Takes a file path as input and will return a string hash value
  2. Creates a SHA-256 hasher object using Python’s hashlib module
  3. Opens the file in binary mode ("rb") to process all file types consistently
  4. Reads the file in 4KB chunks (4096 bytes) rather than loading the entire file into memory at once, which is efficient for large files
  5. Updates the hash incrementally with each chunk using the walrus operator (:=) in a while loop
  6. Returns a hexadecimal digest of the SHA-256 hash, which is a unique 64-character string that effectively serves as a fingerprint for the file

This hash is later used in the document processing loop to check if a file with identical content has already been processed and stored in the vector database, allowing the system to skip duplicate files regardless of their filenames.

Document Processing Loop

# Document processing loop

processed_files = 0
skipped_files = 0

for file_name in os.listdir(FOLDER_PATH):
    file_path = os.path.join(FOLDER_PATH, file_name)
    if not os.path.isfile(file_path):
        continue

    try:
        file_hash = hash_file(file_path)
        
        # Check if file already exists in database
        existing = vector_store.similarity_search(
            query="file check",  # Dummy query
            k=1,
            expr=f'file_hash == "{file_hash}"'
        )
        
        if existing:
            print(f"⏩ Skipping {file_name} (already in database)")
            skipped_files += 1
            continue

        # Load documents
        if file_name.endswith(".txt"):
            loader = TextLoader(file_path)
        elif file_name.endswith(".pdf"):
            loader = PyMuPDFLoader(file_path)
        elif file_name.endswith(".md"):
            loader = UnstructuredMarkdownLoader(file_path)
        elif file_name.endswith(".docx"):
            loader = UnstructuredWordDocumentLoader(file_path)
        else:
            print(f"⚠️ Unsupported file type: {file_name}")
            continue

        # Process and add documents
        docs = loader.load()
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000, 
            chunk_overlap=200
        )
        split_docs = text_splitter.split_documents(docs)
        
        # Add file hash to all chunks
        for doc in split_docs:
            doc.metadata["file_hash"] = file_hash
            doc.metadata["file_name"] = file_name
            
        vector_store.add_documents(split_docs)
        processed_files += 1
        print(f"✅ Processed {file_name} ({len(split_docs)} chunks)")

    except Exception as e:
        print(f"❌ Failed to process {file_name}: {str(e)}")

print(f"\nSummary:\n- Processed files: {processed_files}\n- Skipped duplicates: {skipped_files}")

This section contains the main processing logic that handles document ingestion with deduplication:

  1. Initialization:
    • Sets up counters to track processed and skipped files
  2. Directory Iteration:
    • Loops through each file in the specified documents directory
    • Skips any non-file entries (like subdirectories)
  3. Deduplication Check:
    • Calculates the SHA-256 hash of each file
    • Queries the vector database to check if any document chunks already have this hash
    • Uses a dummy query with a filter expression to match on the file hash metadata
    • If a match is found, skips the file and increments the skipped counter
  4. Document Loading:
    • Selects the appropriate document loader based on file extension
    • Supports multiple file types: TXT, PDF, Markdown, and Word documents
    • Skips unsupported file types with a warning
  5. Document Processing:
    • Loads the document content using the selected loader
    • Splits the document into smaller chunks (1000 characters with 200 character overlap)
    • This chunking makes documents more suitable for embedding and retrieval
  6. Metadata Tagging:
    • Adds the file hash and filename to each document chunk’s metadata
    • This metadata enables future deduplication checks and source tracking
  7. Vector Database Storage:
    • Adds the document chunks to the Milvus vector store
    • The embedding model automatically converts text to vectors during this process
  8. Progress Tracking:
    • Tracks successful processing and displays chunk count
    • Catches and reports any errors during processing
    • Provides a final summary of processed and skipped files

This loop efficiently processes multiple file types while preventing duplicates, which optimizes storage and improves search quality.

This code demonstrates semantic search in action:

  1. Sets “hobbit” as the search query to find semantically related content
  2. Performs a vector similarity search using the similarity_search_with_score method
  3. Retrieves the top 3 most similar chunks (specified by k=3)
  4. Prints each result with:
    • Its similarity score (higher means better match)
    • The actual text content from the document
    • A separator line between results

This showcases the primary function of the system: finding semantically relevant information across multiple document types based on meaning rather than just keyword matching.

# Perform similarity search
query = "hobbit"
results = vector_store.similarity_search_with_score(query, k=3)

# Print results
for res, score in results:
	print(f"* [SIM={score:.3f}] {res.page_content}")
	print('-----------')

Conclusion

This semantic search implementation demonstrates the power of combining modern vector embedding techniques with intelligent deduplication strategies. By leveraging SHA-256 file hashing, the system efficiently prevents duplicate documents from cluttering the vector database, ensuring optimal search quality and resource utilization. The LangChain framework with Ollama embeddings and Milvus vector storage provides a flexible foundation that can be extended to handle various document formats and search requirements. As organizations continue to deal with growing volumes of unstructured data, this approach offers a scalable solution that balances performance, accuracy, and efficiency. The ability to find semantically relevant information across heterogeneous document collections—while maintaining a clean, duplicate-free database—makes this system particularly valuable for knowledge management, information retrieval, and document analysis applications.

Full Code


import os
import hashlib
from langchain_community.embeddings import OllamaEmbeddings
from langchain_milvus import Milvus
from langchain_community.document_loaders import (
    TextLoader,
    PyMuPDFLoader,
    UnstructuredMarkdownLoader,
    UnstructuredWordDocumentLoader,
)
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Configuration
FOLDER_PATH = "./documents"
COLLECTION_NAME = "semantic_search_collection"
URI = "./multi-file.db"

embeddings = OllamaEmbeddings(model="nomic-embed-text")

vector_store = Milvus(
    embedding_function=embeddings,
    connection_args={"uri": URI},
    collection_name=COLLECTION_NAME,
    auto_id=True,
    index_params={"metric_type": "COSINE"},
)

def hash_file(file_path: str) -> str:
    """Generate SHA-256 hash of file contents"""
    sha256 = hashlib.sha256()
    with open(file_path, "rb") as f:
        while chunk := f.read(4096):
            sha256.update(chunk)
    return sha256.hexdigest()

# Document processing loop

processed_files = 0
skipped_files = 0

for file_name in os.listdir(FOLDER_PATH):
    file_path = os.path.join(FOLDER_PATH, file_name)
    if not os.path.isfile(file_path):
        continue

    try:
        file_hash = hash_file(file_path)
        
        # Check if file already exists in database
        existing = vector_store.similarity_search(
            query="file check",  # Dummy query
            k=1,
            expr=f'file_hash == "{file_hash}"'
        )
        
        if existing:
            print(f"⏩ Skipping {file_name} (already in database)")
            skipped_files += 1
            continue

        # Load documents
        if file_name.endswith(".txt"):
            loader = TextLoader(file_path)
        elif file_name.endswith(".pdf"):
            loader = PyMuPDFLoader(file_path)
        elif file_name.endswith(".md"):
            loader = UnstructuredMarkdownLoader(file_path)
        elif file_name.endswith(".docx"):
            loader = UnstructuredWordDocumentLoader(file_path)
        else:
            print(f"⚠️ Unsupported file type: {file_name}")
            continue

        # Process and add documents
        docs = loader.load()
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000, 
            chunk_overlap=200
        )
        split_docs = text_splitter.split_documents(docs)
        
        # Add file hash to all chunks
        for doc in split_docs:
            doc.metadata["file_hash"] = file_hash
            doc.metadata["file_name"] = file_name
            
        vector_store.add_documents(split_docs)
        processed_files += 1
        print(f"✅ Processed {file_name} ({len(split_docs)} chunks)")

    except Exception as e:
        print(f"❌ Failed to process {file_name}: {str(e)}")

print(f"\nSummary:\n- Processed files: {processed_files}\n- Skipped duplicates: {skipped_files}")

# Perform similarity search
query = "hobbit"
results = vector_store.similarity_search_with_score(query, k=3)

# Print results
for res, score in results:
	print(f"* [SIM={score:.3f}] {res.page_content}")
	print('-----------')

No responses yet

Leave a Reply

Your email address will not be published. Required fields are marked *

Latest Comments

No comments to show.