This article covers days 10-12 of my 75 Days Of Generative AI series. We are now very close to covering the first part of building our LLM Twin. In the previous article, we covered scraping our data from LinkedIn and Substack and loading it into MongoDB. This though is only the initial part of the data pipeline.
Today we cover the rest part of the data pipeline. This includes
Reading the loaded data from MongoDB
Cleaning the data using well-known NLP techniques
Converting the data into embeddings
Loading the embeddings into QDrant ( a vector DB )
Approach
We will go the Object Oriented route for this part of the system. A quick overview of the class diagram for the project is given below
The project has the following parts
Connectors - These are responsible for maintaining connections to MongoDB and Qdrant.
Retriever and Extractor- As the name suggests retrieves our LinkedIn posts and Substack articles from MongoDB. The extractor converts the data into a format that can then be used to pre-process.
TextPreProcessor - Processor utility to clean the data using various well-known techniques ( explained below ).
EmbeddingGenerator - Generates Embeddings for the cleaned data for loading into our Vector DB
RAGPipeline - Orchestration class to run each step.
Connectors
We have already covered MongoDBConnector in the previous article. If you want a refresher check this out. We enhance the existing class by adding one more function to read the data
async def fetch_batch(self, database: str, collection: str, batch_size: int, skip: int = 0):
db = self.client[database]
coll = db[collection]
cursor = coll.find().skip(skip).limit(batch_size)
return await cursor.to_list(length=batch_size)
Here we use the asyncio. Async IO is a concurrent programming design that supports async await for asynchronous calls.
QdrantConnector
QDrant is one of the most widely used open-source vector databases. The great part of it is that it provides a completely free 1GB cluster on any cloud of your choice. It’s just 5 steps to set up your cluster! Check out more on the official website documentation.
The connector follows a similar principle as MongoDB. Here is the snippet for it
from typing import Optional, Dict, Any
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams
import os
from dotenv import load_dotenv
class QdrantConnector:
_instance: Optional['QdrantConnector'] = None
def __new__(cls) -> 'QdrantConnector':
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self) -> None:
if self._initialized:
return
self._initialized = True
self._client: Optional[QdrantClient] = None
self._config: Dict[str, Any] = {}
self._load_config()
def _load_config(self) -> None:
load_dotenv()
self._config = {
'url': os.getenv('QDRANT_URL', 'localhost'),
'api_key': os.getenv('QDRANT_API_KEY'),
'prefer_grpc': os.getenv('QDRANT_PREFER_GRPC', 'False').lower() == 'true'
}
def connect(self) -> None:
if self._client is None:
try:
self._client = QdrantClient(
url=self._config['url'],
api_key=self._config['api_key'],
prefer_grpc=self._config['prefer_grpc']
)
except Exception as e:
raise ConnectionError(f"Failed to connect to Qdrant: {str(e)}")
def get_client(self) -> QdrantClient:
if self._client is None:
self.connect()
return self._client
def create_collection(self, collection_name: str, vector_size: int, distance: Distance = Distance.COSINE) -> None:
client = self.get_client()
try:
client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=vector_size, distance=distance)
)
except Exception as e:
raise RuntimeError(f"Failed to create collection: {str(e)}")
def upload_batch(self, collection_name: str, ids: list[str], vectors: list[list[float]], payloads: list[dict[str, any]]):
client = self.get_client()
client.upload_collection(
collection_name=collection_name,
ids=ids,
vectors=vectors
)
def close(self) -> None:
if self._client:
self._client.close()
self._client = None
def __del__(self) -> None:
self.close()
if __name__ == "__main__":
connector = QdrantConnector()
connector.connect()
print(connector.get_client())
The two main functions to explain here are
create_collection - It’s a one-time task to create a collection in the Qdrant DB
upload_batch - Used to add embeddings of our data as vectors in batches.
TextPreProcessor
There is a common saying that “Garbage In is Garbage Out”. Hence, it is very important to clean our data to remove our unnecessary characters, stop words, and in the case of HTML parsed data, the HTML tags.
There are 2 parts in which this is done. One is the TextPreProcessor class which remove all typical things you would not want in your text
import re
from typing import List, Optional
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer, WordNetLemmatizer
class TextPreprocessor:
def __init__(self,
lowercase: bool = True,
remove_punctuation: bool = True,
remove_numbers: bool = False,
remove_whitespace: bool = True,
remove_newlines: bool = True,
remove_stopwords: bool = False,
stemming: bool = False,
lemmatization: bool = False):
self.lowercase = lowercase
self.remove_punctuation = remove_punctuation
self.remove_numbers = remove_numbers
self.remove_whitespace = remove_whitespace
self.remove_newlines = remove_newlines
self.remove_stopwords = remove_stopwords
self.stemming = stemming
self.lemmatization = lemmatization
if self.remove_stopwords or self.stemming or self.lemmatization:
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
if self.lemmatization:
nltk.download('wordnet', quiet=True)
self.stop_words = set(stopwords.words('english')) if self.remove_stopwords else None
self.stemmer = PorterStemmer() if self.stemming else None
self.lemmatizer = WordNetLemmatizer() if self.lemmatization else None
def preprocess(self, text: str) -> str:
if not text:
return ""
if self.lowercase:
text = self._to_lowercase(text)
if self.remove_newlines:
text = self._remove_newlines(text)
if self.remove_punctuation:
text = self._remove_punctuation(text)
if self.remove_numbers:
text = self._remove_numbers(text)
if self.remove_whitespace:
text = self._remove_whitespace(text)
tokens = self._tokenize(text)
if self.remove_stopwords:
tokens = self._remove_stopwords(tokens)
if self.stemming:
tokens = self._apply_stemming(tokens)
if self.lemmatization:
tokens = self._apply_lemmatization(tokens)
return " ".join(tokens)
def _to_lowercase(self, text: str) -> str:
return text.lower()
def _remove_newlines(self, text: str) -> str:
return re.sub(r'\n+', ' ', text)
def _remove_punctuation(self, text: str) -> str:
return re.sub(r'[^\w\s]', '', text)
def _remove_numbers(self, text: str) -> str:
return re.sub(r'\d+', '', text)
def _remove_whitespace(self, text: str) -> str:
return " ".join(text.split())
def _tokenize(self, text: str) -> List[str]:
return word_tokenize(text)
def _remove_stopwords(self, tokens: List[str]) -> List[str]:
return [token for token in tokens if token not in self.stop_words]
def _apply_stemming(self, tokens: List[str]) -> List[str]:
return [self.stemmer.stem(token) for token in tokens]
def _apply_lemmatization(self, tokens: List[str]) -> List[str]:
return [self.lemmatizer.lemmatize(token) for token in tokens]
def set_custom_stopwords(self, custom_stopwords: List[str]):
if self.stop_words is None:
self.stop_words = set()
self.stop_words.update(custom_stopwords)
def remove_custom_stopwords(self, custom_stopwords: List[str]):
if self.stop_words is not None:
self.stop_words.difference_update(custom_stopwords)
There is also a special case for substack articles where we scraped the complete HTML and stored it in MongoDB. For removing the HTML part and keeping only the data we use the following function
def clean_html(html_content: str) -> str:
"""
Remove HTML tags from the given content and clean up the resulting text.
Args:
html_content (str): The HTML content to be cleaned.
Returns:
str: The cleaned text content.
"""
# Parse the HTML content
soup = BeautifulSoup(html_content, 'html.parser')
# Extract text from the parsed HTML
text = soup.get_text(separator=' ')
return text
EmbeddingGenerator
Now that we have our data from LinkedIn and Substack loaded and cleaned, the next step is to convert it into a form that is suitable for a vector DB. If you want to read more about what vector db is and how it works check this previous article.
We are using a library called sentence-transformers which enables Multilingual Sentence, Paragraph, and Image Embeddings using BERT & Co. It’s a simple class that does one thing. Generate embedding for text provided. This required GPU to run fast enough!
from sentence_transformers import SentenceTransformer
class EmbeddingGenerator:
def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
self.model = SentenceTransformer(model_name)
def generate(self, texts: list[str]) -> list[list[float]]:
return self.model.encode(texts).tolist()
Bringing everything together - RAGPipeline
The pipeline is the place where everything comes together and we make sure each step is followed correctly
As expected its nothing fancy but just a structured way of calling each step
class RAGPipeline:
def __init__(self, mongo_connector: MongoDBConnector, text_preprocessor: TextPreprocessor,
embedding_generator: EmbeddingGenerator, qdrant_connector: QdrantConnector):
self.mongo_connector = mongo_connector
self.text_preprocessor = text_preprocessor
self.embedding_generator = embedding_generator
self.qdrant_connector = qdrant_connector
async def process_batch(self, mongo_db: str, mongo_collection: str, qdrant_collection: str, batch_size: int, skip: int = 0):
# Fetch data from MongoDB
documents = await self.mongo_connector.fetch_batch(mongo_db, mongo_collection, batch_size, skip)
# Preprocess text and prepare data for embedding
preprocessed_texts = []
ids = []
payloads = []
for doc in documents:
preprocessed_text = self.text_preprocessor.preprocess(doc.get('content', ''))
preprocessed_texts.append(preprocessed_text)
ids.append(str(doc['_id']))
payloads.append({
'original_id': str(doc['_id']),
'author': doc.get('author', ''),
'preprocessed_text': preprocessed_text
})
# Generate embeddings
embeddings = self.embedding_generator.generate(preprocessed_texts)
# Upload to Qdrant
self.qdrant_connector.upload_batch(qdrant_collection, ids, embeddings, payloads)
return len(documents)
Conclusion
Phew! That was a long one! Congratulations if you have reached this part of the article. However, the data pipeline is equally important to actual model training and inference. Now that we have the data pipeline setup, our next step would be to fine-tune that model. In upcoming articles, we talk about two techniques for fine-tuning
PEFT
RAG
Stay tuned for the next part of building our digital twin.