Day 8,9 - Data Pipeline For Your LLM Twin
Fetching articles and posts from Substack and LinkedIn
This article covers day 8,9 of my 75 Days Of Generative AI series. In the previous article, we discussed our plan to create an end-to-end app that will act as your LLM Twin. If you want to know more about the high-level architecture please read this. If you want a general introduction to LLMs check the index here.
Days 8 and 9 focused on the first part of the app, ingesting my Substack articles and LinkedIn Posts to a MongoDB database. The idea is first to get all the data and then clean it, add metadata to it, and add it to vector DB for consumption in the training layer.
Substack Crawler
Here is an overview of the flow
The full pipeline involves the following steps
Visiting the substack sitemap for my newsletter using beautifulsoup.
Fetching Year wise post links and parsing them again via beautifulsoup.
Fetching links for each post and then used requests to get the content of the articles
The last step is to convert the data into JSON format and load it in substack article collection in MongoDB
Beautiful Soup is a Python package for parsing HTML and XML documents, including those with malformed markup. It creates a parse tree for documents that can be used to extract data from HTML, which is useful for web scraping
Here is the code for fetching and parsing data from Substack sitemap route
from bs4 import BeautifulSoup
import requests
import json
def extract_links(anchorTag_array):
""" Common utility to take in anchor tag array an return
a array of actual href urls as part of the anchor tag"""
href_array= []
for a in anchorTag_array:
href_array.append(a.get('href'))
return href_array
def scrape_sitemap(username,sitemap_link="/sitemap"):
"""Scrapes the high level sitemap page and give back
array of links of year wise articles
"""
url = f"https://{username}.substack.com{sitemap_link}"
sitemap_response = requests.get(url)
soup = BeautifulSoup(sitemap_response.content, "html.parser")
sitemap_links = soup.find_all('a', class_='sitemap-link')
return(extract_links(sitemap_links))
def scrape_yearly_posts(username):
"""Scrapes the second level sitemap page which has links to year wise
post and give back array of links of articles
"""
sitemap_urls = scrape_sitemap(username)
post_urls = []
for url in sitemap_urls:
yearly_url = f"https://{username}.substack.com{url}"
response = requests.get(yearly_url)
soup = BeautifulSoup(response.content, "html.parser")
sitemap_links = soup.find_all('a', class_='sitemap-link')
post_urls.append(extract_links(sitemap_links))
return [item for row in post_urls for item in row]
def fetch_post(url):
response = requests.get(url)
return str(response.content)
Then we put the data into MongoDB. The best part is that MongoDB provides a free cluster with 512 MB space for anyone! Hence we will be using the free cloud version of MongoDB
Here is the code for triggering scrape for a particular username and fetching posts to bulk insert into a MongoDB collection
from scrapers import substack_sitemap_scraper
from db import mongo_connector
def add_substack_data(username):
post_urls = substack_sitemap_scraper.scrape_yearly_posts(username)
url_html_pairs = {}
for url in post_urls:
url_html_pairs[url] = substack_sitemap_scraper.fetch_post(url)
mongo_connector.add_substack_data(url_html_pairs)
if __name__ == "__main__":
print(add_substack_data('username'))
mongo_connector is a fairly simple Python script
from pymongo.mongo_client import MongoClient
from datetime import datetime
uri = "mongodb+srv://usernam:password@mongourl/?retryWrites=true&w=majority&appName=LLMTwin"
# Create a new client and connect to the server
client = MongoClient(uri)
# Add DB name
db = client['llm']
def add_substack_data(url_html_pairs):
collection = db['substack_articles']
# Prepare documents for insertion
documents = []
for url, html in url_html_pairs.items():
document = {
"url": url,
"html": html,
"scraped_at": datetime.utcnow()
}
documents.append(document)
if documents:
result = collection.insert_many(documents)
print(f"Inserted {len(result.inserted_ids)} documents.")
else:
print("No documents to insert.")
# Close the connection
client.close()
As you see it’s a fairly generic way of fetching data and pushing it into MongoDB collection. But it works well. Here is a screenshot of my MongoDB collection
LinkedIn Crawler
Getting post data from LinkedIn turned out to be more tricky than I originally thought. There is no clear API where you can fetch the data. Hence the approach I followed was
Use Selenium web driver to open a Chrome instance and log in to LinkedIn
Scroll through the profile fetching all the post information
There are methods to scrape other parts of the profile as well like section, summary, etc.
As before, we use the Mongopy client to add data to the MongoDB database.
For this pipeline, I decided to go the object-oriented route in Python. Hence we first build a BaseAbstractCrawler. It mainly covers common aspects of setting up Selenium web driver
We are also using Pydantic¶ which is a data validation library for models in Python
import time
from abc import ABC, abstractmethod
from tempfile import mkdtemp
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from db.documents import BaseDocument
class BaseCrawler(ABC):
model: type[BaseDocument]
@abstractmethod
def extract(self, link: str, **kwargs) -> None: ...
class BaseAbstractCrawler(BaseCrawler, ABC):
def __init__(self, scroll_limit: int = 5) -> None:
options = webdriver.ChromeOptions()
options.binary_location = "path/to/chrome"
options.add_argument("--disable-3d-apis")
options.add_argument("--no-sandbox")
options.add_argument("--headless=new")
options.add_argument("--single-process")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("--log-level=3")
options.add_argument("--disable-popup-blocking")
options.add_argument("--disable-notifications")
options.add_argument("--disable-dev-tools")
options.add_argument("--ignore-certificate-errors")
options.add_argument("--no-zygote")
options.add_argument(f"--user-data-dir={mkdtemp()}")
options.add_argument(f"--data-path={mkdtemp()}")
options.add_argument(f"--disk-cache-dir={mkdtemp()}")
options.add_argument("--remote-debugging-port=9222")
options.add_argument("disable-blink-features")
options.add_argument("disable-blink-features=AutomationControlled")
self.set_extra_driver_options(options)
self.scroll_limit = scroll_limit
self.driver = webdriver.Chrome(
service=webdriver.ChromeService("path//to//chromdriver"),
options=options,
)
def set_extra_driver_options(self, options: Options) -> None:
pass
def login(self) -> None:
pass
def scroll_page(self) -> None:
"""Scroll through the LinkedIn page based on the scroll limit."""
current_scroll = 0
last_height = self.driver.execute_script("return document.body.scrollHeight")
while True:
self.driver.execute_script(
"window.scrollTo(0, document.body.scrollHeight);"
)
time.sleep(5)
new_height = self.driver.execute_script("return document.body.scrollHeight")
if new_height == last_height or (
self.scroll_limit and current_scroll >= self.scroll_limit
):
break
last_height = new_height
current_scroll += 1
And here is the implementation for LinkedIn ( this is a modified version from the original series about building LLM Twin ).
import time
from typing import Dict, List
from bs4 import BeautifulSoup
from bs4.element import Tag
from errors import ImproperlyConfigured
from selenium.webdriver.common.by import By
from db.documents import PostDocument
from crawlers.base import BaseAbstractCrawler
from config import settings
class LinkedInCrawler(BaseAbstractCrawler):
model = PostDocument
def set_extra_driver_options(self, options) -> None:
options.add_experimental_option("detach", True)
def extract(self, link: str, **kwargs):
print(f"Starting scrapping data for profile: {link}")
self.login()
print("logged in successfully")
soup = self._get_page_content(link)
data = {
"Name": self._scrape_section(soup, "h1", class_="text-heading-xlarge"),
"About": self._scrape_section(soup, "div", class_="display-flex ph5 pv3"),
"Main Page": self._scrape_section(soup, "div", {"id": "main-content"}),
"Experience": self._scrape_experience(link),
"Education": self._scrape_education(link),
}
self.driver.get(link)
time.sleep(5)
button = self.driver.find_element(
By.CSS_SELECTOR,
".app-aware-link.profile-creator-shared-content-view__footer-action",
)
button.click()
# Scrolling and scraping posts
self.scroll_page()
soup = BeautifulSoup(self.driver.page_source, "html.parser")
post_elements = soup.find_all(
"div",
class_="update-components-text relative update-components-update-v2__commentary",
)
buttons = soup.find_all("button", class_="update-components-image__image-link")
post_images = self._extract_image_urls(buttons)
posts = self._extract_posts(post_elements, post_images)
print(posts)
print(f"Found {len(posts)} posts for profile: {link}")
self.driver.close()
print("drive closed")
post_documents = []
for post_id, post_data in posts.items():
try:
post_doc = PostDocument(platform="linkedin",content=post_data,author_id=kwargs.get("user"))
post_documents.append(post_doc)
except Exception as e:
print(f"Error processing {post_id}: {e}")
try:
self.model.bulk_insert(post_documents)
except Exception as e:
print(str(e))
return {"statusCode": 500, "body": f"An error occurred: {str(e)}"}
print(f"Finished scrapping data for profile: {link}")
def _scrape_section(self, soup: BeautifulSoup, *args, **kwargs) -> str:
"""Scrape a specific section of the LinkedIn profile."""
# Example: Scrape the 'About' section
parent_div = soup.find(*args, **kwargs)
return parent_div.get_text(strip=True) if parent_div else ""
def _extract_image_urls(self, buttons: List[Tag]) -> Dict[str, str]:
"""
Extracts image URLs from button elements.
Args:
buttons (List[Tag]): A list of BeautifulSoup Tag objects representing buttons.
Returns:
Dict[str, str]: A dictionary mapping post indexes to image URLs.
"""
post_images = {}
for i, button in enumerate(buttons):
img_tag = button.find("img")
if img_tag and "src" in img_tag.attrs:
post_images[f"Post_{i}"] = img_tag["src"]
else:
print("No image found in this button")
return post_images
def _get_page_content(self, url: str) -> BeautifulSoup:
"""Retrieve the page content of a given URL."""
self.driver.get(url)
time.sleep(5)
return BeautifulSoup(self.driver.page_source, "html.parser")
def _extract_posts(
self, post_elements: List[Tag], post_images: Dict[str, str]
) -> Dict[str, Dict[str, str]]:
"""
Extracts post texts and combines them with their respective images.
Args:
post_elements (List[Tag]): A list of BeautifulSoup Tag objects representing post elements.
post_images (Dict[str, str]): A dictionary containing image URLs mapped by post index.
Returns:
Dict[str, Dict[str, str]]: A dictionary containing post data with text and optional image URL.
"""
posts_data = {}
try:
for i, post_element in enumerate(post_elements):
post_text = post_element.get_text(strip=True, separator="\n")
post_data = {"text": post_text}
posts_data[f"Post_{i}"] = post_data
except Exception as e:
print(str(e))
return posts_data
def _scrape_experience(self, profile_url: str) -> str:
"""Scrapes the Experience section of the LinkedIn profile."""
self.driver.get(profile_url + "/details/experience/")
time.sleep(5)
soup = BeautifulSoup(self.driver.page_source, "html.parser")
experience_content = soup.find("section", {"id": "experience-section"})
return experience_content.get_text(strip=True) if experience_content else ""
def _scrape_education(self, profile_url: str) -> str:
self.driver.get(profile_url + "/details/education/")
time.sleep(5)
soup = BeautifulSoup(self.driver.page_source, "html.parser")
education_content = soup.find("section", {"id": "education-section"})
return education_content.get_text(strip=True) if education_content else ""
def login(self):
"""Log in to LinkedIn."""
self.driver.get("https://www.linkedin.com/login")
if not settings.LINKEDIN_USERNAME and not settings.LINKEDIN_PASSWORD:
raise ImproperlyConfigured(
"LinkedIn scraper requires an valid account to perform extraction"
)
self.driver.find_element(By.ID, "username").send_keys(
settings.LINKEDIN_USERNAME
)
self.driver.find_element(By.ID, "password").send_keys(
settings.LINKEDIN_PASSWORD
)
self.driver.find_element(
By.CSS_SELECTOR, ".login__form_action_container button"
).click()
Lots is going on here. Let’s break it down
extract function is the entry point. It takes in the link of the user’s profile on LinkedIn
then it calls the login method which takes in username and password from a settings file
next step is to use the CSS selector and log into your profile
post that it’s some beautiful soup and selenium magic to scrape posts for that logged-in user
last step is call bulk insert the PostModel which we have created. Here is the pydantic model for Post
class PostDocument(BaseDocument):
print("creating document")
platform: str
content: dict
author_id: str = Field(alias="author_id")
And now we have all LinkedIn posts in MongoDB!
That was a lot to take in for data ingestion but that’s how you usually deal with unstructured data on the web. Not a lot of data is available via APIs ready to use
Next Steps
The next step for tomorrow is to clean the data and add metadata to it. Then we will build an ETL pipeline to push out data to our VectorDB for it to be used for fine-tuning our LLM.
If you like the series feel free to subscribe to my newsletter or follow me on LinkedIn.