Headless Web Scraping with Python

Some websites require that you execute javascript before you can see any content on the page, or you may want to emulate a real user. In order to handle these use cases we'll learn how to use pyppeteer which is a library for controlling a Headless Chrome browser with Python.

Introduction

We'll cover how to use Headless Chrome for web scraping Google Places. Google places does not necessarily require javascript because google will serve a different response if you disable javascript. But for better user emulation when browsing/scraping google places, a browser is recommended.

Headless Chrome is essentially the Chrome browser running without a head (no graphical user interface). The benefit being you can run a headless browser on a server environment that also has no graphical interface attached to it, which is normally accessed through shell access. It can also be faster to run headless and can have lower overhead on system resources.

Puppeteer

Controlling a browser

We need a way to control the browser with code, this can be done through what is called the Chrome DevTools Protocol or CDP. CDP is essentially a websocket server running on the browser that is based on JSONRPC. Instead of directly working with CDP we'll use a library called pyppeteer which is a python implementation of the CDP protocol that provides an easier to use abstraction. It's inspired by the Node version of the same library called puppeteer.

Setting up

As usual with any of my python projects, I recommend working in a virtual python environment which helps us address dependencies and versions separately for each application / project. Let's create a virtual environment in our home directory and install the dependencies we need.

Make sure you are running at least python 3.6.1, 3.5 is end of support. The pyppeteer library will not work with python 3.6.0, this is due to the websockets library that it depends on not supporting that python version.

mkdir ~/headless-web-scraping
cd ~/headless-web-scraping
virtualenv env
. env/bin/activate # activate the environment which populates the shell's PATH
pip install pyppeteer

Let's create the following folders and files.

├── env # auto generated by virtualenv
├── core
│   ├── browser.py
│   └── utils.py
└── google-places
└── __main__.py

We created a __main__.py file, this lets us run the Google Places scraper with the following command (nothing should happen right now):

python -m google-places

Launching a headless browser

We need to launch a Chrome browser. By default, pyppeteer will install the latest version of Chromium. It's also possible to just use Chrome as long as it is installed on your system. The library makes use of async/await for concurrency. In order to use this we import the asyncio package from python.

To launch with Chrome instead of Chromium add executablePath option to the launch function. Below, we launch the browser, navigate to google and take a screenshot. The screenshot will be saved in the folder you are running the scraper.

import asyncio
import pyppeteer
async def main():
browser = await pyppeteer.launch({
# change to your local system chrome path
# or remove to launch with the chromium that pyppeteer installs
'executablePath': '/usr/bin/google-chrome',
})
page = await browser.newPage()
await page.goto('https://google.com')
await page.screenshot({'path': 'google.png'})
await browser.close()
# block until main function completes
asyncio.get_event_loop().run_until_complete(main())

Digging in

Let's create some functions in core/browser.py to simplify working with a browser and the page. We'll make use of what I believe is an awesome feature in python for simplifying management of resources called context manager. Specifically we will use an async context manager.

An asynchronous context manager is a context manager that is able to suspend execution in its enter and exit methods.

This feature in python lets us write code like the below which handles opening and closing a browser with one line.

async with browser.PageSession("https://google.com/") as page_session:

Let's add the PageSession async context manager in the file core/browser.py.

import pyppeteer
class PageSession:
def __init__(self, url):
self.url = url
# define what happens when the context starts
async def __aenter__(self):
self.browser = await pyppeteer.launch({
# change to your local system chrome path
'executablePath': '/usr/bin/google-chrome',
})
self.page = await self.browser.newPage()
await self.page.goto(self.url)
return self
# define what happens when the context is exiting
async def __aexit__(self, exc_type, exc, tb):
await self.browser.close()

In our google-places/__main__.py file let's make use of our new PageSession and print the html content of the final rendered page with javascript executed.

import asyncio
from core import browser
async def main():
async with browser.PageSession("https://google.com/") as page_session:
# get reference to current page (tab)
page = page_session.page
final_html = await page.content()
print(final_html)
asyncio.get_event_loop().run_until_complete(main())

Run the google-places module in your terminal with the same command we used earlier.

python -m google-places
<!DOCTYPE html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta charset="UTF-8"><meta content="Search the world's information, including webpages, images, videos and more. Google has many special features to help you find exactly what you're looking for.
...more html

So now we can launch a browser, open a page (a tab in chrome) and navigate to a website and wait for javascript to finish loading/executing then close the browser with the above code.

Next let's do the following:

  • We want to visit google.com
  • Enter a search query for pediatrician near 94118
  • Click on google places to see more results
  • Scrape results from the page
  • Save results to a CSV file

Navigating pages

We want to end up on the following page navigations so we can pull the data we need.

Google Navigations

Let's start by breaking up our code in google-places/__main__.py so we can first search then navigate to google places. We also want to clean up some of the string literals like the google url.

import asyncio
from core import browser
GOOGLE_URL = "https://google.com"
GOOGLE_QUERY = "pediatrician near 94118"
async def search_google(page_session, query):
page = page_session.page
view_all_selector = "//a[contains(., 'View all')]"
google_places_map = "//div[@id='lu_pinned_rhs']"
search_bar_selector = "//input[@title='Search']"
# there are two matches, we want the second match (you can verify in your browser manually)
search_button_selector = "(//input[@value='Google Search'])[2]"
search_bar = await page.xpath(search_bar_selector)
search_button = await page.xpath(search_button_selector)
# only one search bar, even though page.xpath returns a list by default
await search_bar[0].type(query)
# click on the search button
await search_button[0].click()
# we want to wait until the navigation finishes to the main results page of google
await page.waitForXPath(view_all_selector, {
'visible': True
})
# select and click view all to go to google places
view_all_button = await page.xpath(view_all_selector)
await view_all_button[0].click()
# wait for google places with map to appear
await page.waitForXPath(google_places_map, {
'visible': True
})
async def main():
async with browser.PageSession(GOOGLE_URL) as page_session:
# crawl
await search_google(page_session, GOOGLE_QUERY)
page = page_session.page
final_html = await page.content()
print(final_html)
asyncio.get_event_loop().run_until_complete(main())

You can see the new code we added above as it has been highlighted. We use XPath to find the search bar, the search button and the view all button to get us to google places.

  1. Type in the search bar
await search_bar[0].type(query)
  1. Click the search button
await search_button[0].click()
  1. Wait for the view all button to appear
await page.waitForXPath(view_all_selector, {
'visible': True
})
  1. Click view all button to take us to google places
await view_all_button[0].click()
  1. Wait for an element on the new page to appear
await page.waitForXPath(google_places_map, {
'visible': True
})

Scraping the data with Pyppeteer

At this point we should be on the google places page and we can pull the data we want. The navigation flow we followed before is important for emulating a user.

Google Places

Let's define the data we want to pull from the page.

  • Name
  • Location
  • Phone
  • Rating
  • Website Link

In core/browser.py let's add two methods to our PageSession to help us grab the text and an attribute (the website link for the doctor).

import pyppeteer
class PageSession:
def __init__(self, url):
self.url = url
async def get_text(self, element):
return await self.page.evaluate('''(element) => {
return element.textContent;
}''', element)
async def get_link(self, element):
return await self.page.evaluate('''(element) => {
return element.href;
}''', element)
# define what happens when the context starts
async def __aenter__(self):
self.browser = await pyppeteer.launch({
# change to your local system chrome path
'executablePath': '/usr/bin/google-chrome',
})
self.page = await self.browser.newPage()
await self.page.goto(self.url)
return self
# define what happens when the context is exiting
async def __aexit__(self, exc_type, exc, tb):
await self.browser.close()

So we added get_text and get_link. These two methods will evaluate javascript on the browser, the same way if you were to type it on the Chrome console. You can see that they just use the DOM to grab the text of the element or the href attribute.

In google-places/__main__.py we will add a few functions that will grab the content that we care about from the page.

async def get_doctor_name(page, element):
name_selector = "*//div[@role='heading']"
name = await element.xpath(name_selector)
return await page.get_text(name[0])
async def get_doctor_location(page, element):
location_selector = "*//div[@role='heading']/following-sibling::span/div[2]"
location = await element.xpath(location_selector)
return await page.get_text(location[0])
async def get_doctor_phone(page, element):
# can return two matches due to display of hours, if we have more than one we grab the second
phone_selector = "*//div[@role='heading']/following-sibling::span/div[3]/span"
phone = await element.xpath(phone_selector)
if (len(phone) == 1):
return await page.get_text(phone[0])
elif (len(phone) > 1):
return await page.get_text(phone[1])
return None
async def get_doctor_rating(page, element):
rating_selector = "*//div[@role='heading']/following-sibling::span/div[1]/span[@aria-hidden]"
rating = await element.xpath(rating_selector)
if (len(rating) > 0):
return await page.get_text(rating[0])
return None
async def get_doctor_link(page, element):
link_selector = "*//a[contains(., 'Website')]"
link = await element.xpath(link_selector)
if (len(link) > 0):
return await page.get_link(link[0])
return None

We make use of XPath to grab the elements. You can practice XPath in your Chrome browser by pressing F12 or right-clicking inspect to open the console. Why do I use XPath? It's easier to specify complex selectors because XPath has built in functions for handling things like finding elements which contain some text or traversing the tree in various ways.

For the phone, rating and link fields we default to None and substitute with 'N/A' because not all doctors have a phone number listed, a rating or a link. All of them seem to have a location and a name.

Because there are many doctors listed on the page we want to find the parent element and loop over each match, then evaluate the XPath we defined above. To do this let's add two more functions to tie it all together.

async def get_doctor_details(page_session, container_element):
name = await get_doctor_name(page_session, container_element)
location = await get_doctor_location(page_session, container_element)
phone = await get_doctor_phone(page_session, container_element)
rating = await get_doctor_rating(page_session, container_element)
link = await get_doctor_link(page_session, container_element)
# we default to 'N/A' for the fields that can return None
return {
'name': name,
'location': location,
'phone': phone or 'N/A',
'rating': rating or 'N/A',
'link': link or 'N/A'
}
async def scrape_doctors(page_session):
page = page_session.page
container_selector = "//div[@role='heading']/ancestor::div[@data-jsdata][@data-hveid]"
containers = await page.xpath(container_selector)
if (len(containers) > 0):
tasks = []
for container_element in iter(containers):
tasks.append(get_doctor_details(page_session, container_element))
# wait for all async tasks to finish
return await asyncio.gather(*tasks)
else:
return []

The entry point here is scrape_doctors which evaluates get_doctor_details on each container element.

In the code below, we loop over each container element that matched our XPath and we get back a Future object by calling the function get_doctor_details. Because we don't use the await keyword, we get back a Future object which can be used by the asyncio.gather call to evaluate all Future objects in the tasks list.

tasks = []
for container_element in iter(containers):
tasks.append(get_doctor_details(page_session, container_element))
# wait for all async tasks to finish
return await asyncio.gather(*tasks)

This line allows us to wait for all async calls to finish concurrently.

return await asyncio.gather(*tasks)

Let's put this together in our main function. First we search and crawl to the right page, then we scrape with scrape_doctors.

async def main():
async with browser.PageSession(GOOGLE_URL) as page_session:
# crawl
await search_google(page_session, GOOGLE_QUERY)
# scrape
doctors = await scrape_doctors(page_session)
print(doctors)
asyncio.get_event_loop().run_until_complete(main())
[{'name': 'SF Bay Pediatrics', 'location': '525 Spruce St #1', 'phone': '(415) 668-8900', 'rating': '4.1', 'link': 'http://www.sfbaypeds.com/'}, {'name': "UCSF Benioff Children's Physicians Pediatric After Hours Clinics & Advice Service", 'location': '3490 California St STE 200', 'phone': '(415) 387-9293', 'rating': '1.6', 'link': 'http://www.afterhours.ubcp.org/'}, {'name': 'Robert H. Langston, MD', 'location': '3838 California St #815', 'phone': '(415) 221-6476', 'rating': '4.1', 'link': 'N/A'}, {'name': 'Dr. Michelle M. Pepitone, MD', 'location': '525 Spruce St', 'phone': '(415) 668-8900', 'rating': 'N/A', 'link': 'http://www.sfbaypeds.com/'}]
...more doctors

Saving the output

In core/utils.py we'll add two functions to help us save our scraped output to a local CSV file.

def dict_to_csv_lines(data_rows):
lines = []
for row in iter(data_rows):
columns = row.keys()
column_values = []
for key in iter(columns):
column_value = row[key]
# account for commas in csv column value
if "," in column_value:
column_value = f"\"{column_value}\""
column_values.append(column_value)
lines.append(','.join(column_values))
return lines
def write_to_csv(file_name, data_rows):
column_headers = data_rows[0].keys()
lines = dict_to_csv_lines(data_rows)
with open(f"{file_name}.csv", 'w') as file:
# first write headers
headers = ','.join(column_headers)
file.write(f"{headers}\n")
for csv_line in iter(lines):
file.write(f"{csv_line}\n")

Let's import it in google-places/__main__.py and save the output of scrape_doctors from our main function.

from core import browser, utils
async def main():
async with browser.PageSession(GOOGLE_URL) as page_session:
# crawl
await search_google(page_session, GOOGLE_QUERY)
# scrape
doctors = await scrape_doctors(page_session)
# save output
utils.write_to_csv("pediatricians", doctors)

We should now have a file called pediatricians.csv which contains our output.

├── core
│   ├── browser.py
│   └── utils.py
├── google-places
│   └──__main__.py
└── pediatricians.csv

Wrapping up

From this guide we should have learned how to use a headless browser to crawl and scrape google places while emulating a real user. There's a lot more you can do with headless browsers such as generate pdfs, screenshots and other automation tasks.

Hopefully this guide helped you get started executing javascript and scraping with a headless browser. Till next time!

Trove Earth
The quickest way to Scrape the Web

Spend less time building infrastructure and more time pulling data.

Sign up free