初级爬虫实战——伯克利新闻

avatar
作者
猴君
阅读量:5

文章目录

发现宝藏

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。【宝藏入口】。

一、 目标

爬取https://news.berkeley.edu/的字段,包含标题、内容,作者,发布时间,链接地址,文章快照 (可能需要翻墙才能访问)

二、简单分析网页

1. 寻找所有新闻

在这里插入图片描述

2. 分析模块、版面和文章

我们可以按照新闻模块、版面、和文章对网页信息进行拆分,分别按照步骤进行爬取

在这里插入图片描述
在这里插入图片描述

三、爬取新闻

1. 爬取模块

由于该新闻只有一个模块,所以直接请求该模块地址即可获取该模块的所有信息,但是为了兼容多模块的新闻,我们还是定义一个数组存储模块地址

class MitnewsScraper:     def __init__(self, root_url, model_url, img_output_dir):         self.root_url = root_url         self.model_url = model_url         self.img_output_dir = img_output_dir         self.headers = {             'Referer': 'https://news.berkeley.edu/',             'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '                           'Chrome/122.0.0.0 Safari/537.36',             'Cookie': '替换成你自己的',         }          ...  def run():     # 根路径     root_url = 'https://news.berkeley.edu/'     # 模块地址数组     model_urls = ['https://news.berkeley.edu/news']     # 文章图片保存路径     output_dir = 'D://imgs//berkeley-news'      for model_url in model_urls:         scraper = MitnewsScraper(root_url, model_url, output_dir)         scraper.catalogue_all_pages()   if __name__ == "__main__":     run() 

多模块的新闻网站例子如下(4个模块)

在这里插入图片描述

2. 爬取版面

  • f12打开控制台,点击网络(network),通过切换页面观察接口的参数传递,发现只有一个page参数

在这里插入图片描述

  • 于是我们可以获取页面下面的页数(page x of xxxx), 然后进行遍历传参,也就遍历获取了所有版面

在这里插入图片描述

    # 获取一个模块有多少版面     def catalogue_all_pages(self):         response = requests.get(self.model_url, headers=self.headers)         soup = BeautifulSoup(response.text, 'html.parser')         try:             match = re.search(r'of (\d+)', soup.text)             num_pages = int(match.group(1))             print('模块一共有' + str(num_pages) + '页版面,')             for page in range(1, num_pages + 1):                 self.parse_catalogues(page)                 print(f"========Finished modeles page {page}========")         except:             return False 
  • F12打开控制台后按照如下步骤获取版面列表对应的dom结构

在这里插入图片描述

在这里插入图片描述

  catalogue_list = soup.find('div', 'filtered-items')   catalogues_list = catalogue_list.find_all('article') 

在这里插入图片描述

  • 遍历版面列表,获取版面标题

在这里插入图片描述

    for index, catalogue in enumerate(catalogues_list):         # 版面标题         catalogue_title = catalogue.find('div', 'news-item__description').find('a').get_text(strip=True)         print('第' + str(index + 1) + '个版面标题为:' + catalogue_title) 

在这里插入图片描述

  • 获取版面更新时间和当下的操作时间

在这里插入图片描述

    # 操作时间     date = datetime.now()     # 更新时间     publish_time = catalogue.find('div', 'news-item__description').find('time').get('datetime')     #  将日期字符串转换为datetime对象     updatetime = datetime.strptime(publish_time, '%Y-%m-%d') 

在这里插入图片描述

  • 保存版面url和版面id, 由于该新闻是一个版面对应一篇文章,所以版面url和文章url是一样的,而且文章没有明显的标识,我们把地址后缀作为文章id,版面id则是文章id后面加上个01, 为了避免标题重复也可以把日期前缀也加上去

在这里插入图片描述

在这里插入图片描述

     # 版面url     catalogue_href = catalogue.find('div', 'news-item__description').find('a').get('href')     catalogue_url = self.root_url + catalogue_href     # 版面id     catalogue_id = catalogue_href[1:]     print('第' + str(index + 1) + '个版面地址为:' + catalogue_url) 

在这里插入图片描述

  • 保存版面信息到mogodb数据库(由于每个版面只有一篇文章,所以版面文章数量cardsize的值赋为1)
	# 连接 MongoDB 数据库服务器 	client = MongoClient('mongodb://localhost:27017/') 	# 创建或选择数据库 	db = client['berkeley-news'] 	# 创建或选择集合 	catalogues_collection = db['catalogues'] 	# 插入示例数据到 catalogues 集合 	catalogue_data = { 		'id': catalogue_id + '01', 		'date': date, 		'title': catalogue_title, 		'url': catalogue_url, 		'cardSize': 1, 		'updatetime': updatetime 	} 

3. 爬取文章

  • 由于一个版面对应一篇文章,所以版面url 、更新时间、标题和文章是一样的,并且按照设计版面id和文章id的区别只是差了个01,所以可以传递版面url、版面id、更新时间和标题四个参数到解析文章的函数里面

  • 获取文章id,文章url,文章更新时间和当下操作时间

# 解析版面 def parse_catalogues(self, page): ...     self.parse_cards_list(catalogue_url, catalogue_id, updatetime, catalogue_title) ...  # 解析文章     def parse_cards_list(self, url, catalogue_id, updatetime, cardtitle):         card_response = requests.get(url, headers=self.headers)         soup = BeautifulSoup(card_response.text, 'html.parser')          

在这里插入图片描述

  • 获取文章作者

在这里插入图片描述

    # 文章作者     author = soup.find('a', href='/author/news').get_text() 

在这里插入图片描述

  • 获取文章原始htmldom结构,并删除无用的部分(以下仅是部分举例),用html_content字段保留原始dom结构

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

        # 原始htmldom结构         html_dom = soup.find('div', 'single-post cb-section cb-stretch')          # 标题上方的冗余         html_cut1 = html_dom.find('div', 'single-post__heading').find('strong')         # 链接冗余         html_cut2 = html_dom.find_all('a', 'a2a_dd share-link')                # 移除元素         if html_cut1:             html_cut1.extract()         if html_cut2:             for item in html_cut2:                 item.extract()         html_content = html_dom  

在这里插入图片描述

在这里插入图片描述

  • 进行文章清洗,保留文本,去除标签,用content保留清洗后的文本
 # 解析文章列表里的文章     def parse_cards_list(self, url, catalogue_id, cardupdatetime, cardtitle):     ...      # 增加保留html样式的源文本         origin_html = html_dom.prettify()  # String         # 转义网页中的图片标签         str_html = self.transcoding_tags(origin_html)         # 再包装成         temp_soup = BeautifulSoup(str_html, 'html.parser')         # 反转译文件中的插图         str_html = self.translate_tags(temp_soup.text)         # 绑定更新内容         content = self.clean_content(str_html)   # 工具 转义标签     def transcoding_tags(self, htmlstr):         re_img = re.compile(r'\s*<(img.*?)>\s*', re.M)         s = re_img.sub(r'\n @@##\1##@@ \n', htmlstr)  # IMG 转义         return s      # 工具 转义标签     def translate_tags(self, htmlstr):         re_img = re.compile(r'@@##(img.*?)##@@', re.M)         s = re_img.sub(r'<\1>', htmlstr)  # IMG 转义         return s      # 清洗文章     def clean_content(self, content):         if content is not None:             content = re.sub(r'\r', r'\n', content)             content = re.sub(r'\n{2,}', '', content)             content = re.sub(r' {6,}', '', content)             content = re.sub(r' {3,}\n', '', content)             content = re.sub(r'<img src="../../../image/zxbl.gif"/>', '', content)             content = content.replace(                 '<img border="0" src="****处理标记:[Article]时, 字段 [SnapUrl] 在数据源中没有找到! ****"/> ', '')             content = content.replace(                 ''' <!--/enpcontent<INPUT type=checkbox value=0 name=titlecheckbox sourceid="<Source>SourcePh " style="display:none">''',                 '') \                 .replace(' <!--enpcontent', '').replace('<TABLE>', '')             content = content.replace('<P>', '').replace('<\P>', '').replace('&nbsp;', ' ')         return content 
  • 下载保存图片
  def parse_cards_list(self, url, catalogue_id, cardupdatetime, cardtitle):  ... 	imgs = []     img_array = soup.find('figure', 'cb-image cb-float--none cb-float--none--md cb-float--none--lg cb-100w cb-100w--md cb-100w--lg new-figure').find_all('img')     for item in img_array:         img_url = item.get('src')         imgs.append(img_url)     if len(imgs) != 0:         # 下载图片         illustrations = self.download_images(imgs, card_id)    # 下载图片     def download_images(self, img_urls, card_id):         result = re.search(r'[^/]+$', card_id)         last_word = result.group(0)          # 根据card_id创建一个新的子目录         images_dir = os.path.join(self.img_output_dir, str(last_word))         if not os.path.exists(images_dir):             os.makedirs(images_dir)             downloaded_images = []             for index, img_url in enumerate(img_urls):                 try:                     response = requests.get(img_url, stream=True, headers=self.headers)                     if response.status_code == 200:                         # 从URL中提取图片文件名                         img_name_with_extension = img_url.split('/')[-1]                         pattern = r'^[^?]*'                         match = re.search(pattern, img_name_with_extension)                         img_name = match.group(0)                          # 保存图片                         with open(os.path.join(images_dir, img_name), 'wb') as f:                             f.write(response.content)                         downloaded_images.append([img_url, os.path.join(images_dir, img_name)])                 except requests.exceptions.RequestException as e:                     print(f'请求图片时发生错误:{e}')                 except Exception as e:                     print(f'保存图片时发生错误:{e}')             return downloaded_images         # 如果文件夹存在则跳过         else:             print(f'文章id为{card_id}的图片文件夹已经存在')             return [] 
  • 保存文章数据
 # 连接 MongoDB 数据库服务器         client = MongoClient('mongodb://localhost:27017/')         # 创建或选择数据库         db = client['berkeley-news']         # 创建或选择集合         cards_collection = db['cards']         # 插入示例数据到 catalogues 集合         card_data = {             'id': card_id,             'catalogueId': catalogue_id,             'type': 'berkeley-news',             'date': date,             'title': card_title,             'author': author,             'updatetime': updateTime,             'url': url,             'html_content': str(html_content),             'content': content,             'illustrations': illustrations,         }         cards_collection.insert_one(card_data)  

四、完整代码

import os from datetime import datetime import requests from bs4 import BeautifulSoup from pymongo import MongoClient import re import traceback   class MitnewsScraper:     def __init__(self, root_url, model_url, img_output_dir):         self.root_url = root_url         self.model_url = model_url         self.img_output_dir = img_output_dir         self.headers = {             'Referer': 'https://news.berkeley.edu/',             'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '                           'Chrome/122.0.0.0 Safari/537.36',             'Cookie': '替换成你自己的',         }      # 获取一个模块有多少版面     def catalogue_all_pages(self):         response = requests.get(self.model_url, headers=self.headers)         soup = BeautifulSoup(response.text, 'html.parser')         try:             match = re.search(r'of (\d+)', soup.text)             num_pages = int(match.group(1))             print('模块一共有' + str(num_pages) + '页版面')             for page in range(1, num_pages + 1):                 print(f"========start catalogues page {page}" + "/" + str(num_pages) + "========")                 self.parse_catalogues(page)                 print(f"========Finished catalogues page {page}" + "/" + str(num_pages) + "========")         except Exception as e:             print(f'Error: {e}')             traceback.print_exc()      # 解析版面列表里的版面     def parse_catalogues(self, page):         params = {'page': page}         response = requests.get(self.model_url, params=params, headers=self.headers)         if response.status_code == 200:             soup = BeautifulSoup(response.text, 'html.parser')             catalogue_list = soup.find('div', 'filtered-items')             catalogues_list = catalogue_list.find_all('article')             for index, catalogue in enumerate(catalogues_list):                 print(f"========start catalogue {index+1}" + "/" + "10========")                 # 版面标题                 catalogue_title = catalogue.find('div', 'news-item__description').find('a').get_text(strip=True)                  # 操作时间                 date = datetime.now()                 # 更新时间                 publish_time = catalogue.find('div', 'news-item__description').find('time').get('datetime')                 #  将日期字符串转换为datetime对象                 updatetime = datetime.strptime(publish_time, '%Y-%m-%d')                  # 版面url                 catalogue_href = catalogue.find('div', 'news-item__description').find('a').get('href')                 catalogue_url = self.root_url + catalogue_href                 # 版面id                 catalogue_id = catalogue_href[1:]                  self.parse_cards_list(catalogue_url, catalogue_id, updatetime, catalogue_title)                  # 连接 MongoDB 数据库服务器                 client = MongoClient('mongodb://localhost:27017/')                 # 创建或选择数据库                 db = client['berkeley-news']                 # 创建或选择集合                 catalogues_collection = db['catalogues']                 # 插入示例数据到 catalogues 集合                 catalogue_data = {                     'id': catalogue_id,                     'date': date,                     'title': catalogue_title,                     'url': catalogue_url,                     'cardSize': 1,                     'updatetime': updatetime                 }                 # 在插入前检查是否存在相同id的文档                 existing_document = catalogues_collection.find_one({'id': catalogue_id})                  # 如果不存在相同id的文档,则插入新文档                 if existing_document is None:                     catalogues_collection.insert_one(catalogue_data)                     print("[爬取版面]版面 " + catalogue_url + " 已成功插入!")                 else:                     print("[爬取版面]版面 " + catalogue_url + " 已存在!")                 print(f"========finsh catalogue {index+1}" + "/" + "10========")             return True         else:             raise Exception(f"Failed to fetch page {page}. Status code: {response.status_code}")      # 解析文章列表里的文章     def parse_cards_list(self, url, catalogue_id, cardupdatetime, cardtitle):         url = 'https://news.berkeley.edu/2024/03/05/meet-our-new-faculty-antoine-levy-economics'         card_response = requests.get(url, headers=self.headers)         soup = BeautifulSoup(card_response.text, 'html.parser')         # 对应的版面id         card_id = catalogue_id         # 文章标题         card_title = cardtitle         # 文章更新时间         updateTime = cardupdatetime         # 操作时间         date = datetime.now()          # 文章作者         try:             author = soup.find('a', href='/author/news').get_text()         except:             author = soup.find('div', 'single-post__heading').find('p').find('a').get_text()          # 原始htmldom结构         html_dom = soup.find('div', 'single-post cb-section cb-stretch')          # 标题上方的冗余         html_cut1 = html_dom.find('div', 'single-post__heading').find('strong')         # 链接冗余         html_cut2 = html_dom.find_all('a', 'a2a_dd share-link')          # 移除元素         if html_cut1:             html_cut1.extract()         if html_cut2:             for item in html_cut2:                 item.extract()          html_content = html_dom          # 增加保留html样式的源文本         origin_html = html_dom.prettify()  # String         # 转义网页中的图片标签         str_html = self.transcoding_tags(origin_html)         # 再包装成         temp_soup = BeautifulSoup(str_html, 'html.parser')         # 反转译文件中的插图         str_html = self.translate_tags(temp_soup.text)         # 绑定更新内容         content = self.clean_content(str_html)         # 下载图片         imgs = []         try:             img_array = soup.find('figure', 'cb-image cb-float--none cb-float--none--md cb-float--none--lg cb-100w cb-100w--md cb-100w--lg new-figure').find_all('img')         except:             img_array = soup.find('div', 'container container--lg cb-container').find_all('img')         if len(img_array) is not None:             for item in img_array:                 img_url = item.get('src')                 if img_url is None:                     img_url = item.get('data-src')                 imgs.append(img_url)         if len(imgs) != 0:             # 下载图片             illustrations = self.download_images(imgs, card_id)         # 连接 MongoDB 数据库服务器         client = MongoClient('mongodb://localhost:27017/')         # 创建或选择数据库         db = client['berkeley-news']         # 创建或选择集合         cards_collection = db['cards']         # 插入示例数据到 cards 集合         card_data = {             'id': card_id,             'catalogueId': catalogue_id,             'type': 'berkeley-news',             'date': date,             'title': card_title,             'author': author,             'updatetime': updateTime,             'url': url,             'html_content': str(html_content),             'content': content,             'illustrations': illustrations,         }         # 在插入前检查是否存在相同id的文档         existing_document = cards_collection.find_one({'id': card_id})          # 如果不存在相同id的文档,则插入新文档         if existing_document is None:             cards_collection.insert_one(card_data)             print("[爬取文章]文章 " + url + " 已成功插入!")         else:             print("[爬取文章]文章 " + url + " 已存在!")       # 下载图片     def download_images(self, img_urls, card_id):         result = re.search(r'[^/]+$', card_id)         last_word = result.group(0)          # 根据card_id创建一个新的子目录         images_dir = os.path.join(self.img_output_dir, str(last_word))         if not os.path.exists(images_dir):             os.makedirs(images_dir)             downloaded_images = []             for index, img_url in enumerate(img_urls):                 try:                     response = requests.get(img_url, stream=True, headers=self.headers)                     if response.status_code == 200:                         # 从URL中提取图片文件名                         img_name_with_extension = img_url.split('/')[-1]                         pattern = r'^[^?]*'                         match = re.search(pattern, img_name_with_extension)                         img_name = match.group(0)                          # 保存图片                         with open(os.path.join(images_dir, img_name), 'wb') as f:                             f.write(response.content)                         downloaded_images.append([img_url, os.path.join(images_dir, img_name)])                         print(f'[爬取文章图片]文章id为{card_id}的图片已保存到本地')                 except requests.exceptions.RequestException as e:                     print(f'请求图片时发生错误:{e}')                 except Exception as e:                     print(f'保存图片时发生错误:{e}')             return downloaded_images         # 如果文件夹存在则跳过         else:             print(f'[爬取文章图片]文章id为{card_id}的图片文件夹已经存在')             return []      # 工具 转义标签     def transcoding_tags(self, htmlstr):         re_img = re.compile(r'\s*<(img.*?)>\s*', re.M)         s = re_img.sub(r'\n @@##\1##@@ \n', htmlstr)  # IMG 转义         return s      # 工具 转义标签     def translate_tags(self, htmlstr):         re_img = re.compile(r'@@##(img.*?)##@@', re.M)         s = re_img.sub(r'<\1>', htmlstr)  # IMG 转义         return s      # 清洗文章     def clean_content(self, content):         if content is not None:             content = re.sub(r'\r', r'\n', content)             content = re.sub(r'\n{2,}', '', content)             content = re.sub(r' {6,}', '', content)             content = re.sub(r' {3,}\n', '', content)             content = re.sub(r'<img src="../../../image/zxbl.gif"/>', '', content)             content = content.replace(                 '<img border="0" src="****处理标记:[Article]时, 字段 [SnapUrl] 在数据源中没有找到! ****"/> ', '')             content = content.replace(                 ''' <!--/enpcontent<INPUT type=checkbox value=0 name=titlecheckbox sourceid="<Source>SourcePh " style="display:none">''',                 '') \                 .replace(' <!--enpcontent', '').replace('<TABLE>', '')             content = content.replace('<P>', '').replace('<\P>', '').replace('&nbsp;', ' ')         return content   def run():     # 根路径     root_url = 'https://news.berkeley.edu/'     # 模块地址数组     model_urls = ['https://news.berkeley.edu/news']     # 文章图片保存路径     output_dir = 'D://imgs//berkeley-news'      for model_url in model_urls:         scraper = MitnewsScraper(root_url, model_url, output_dir)         scraper.catalogue_all_pages()   if __name__ == "__main__":     run()  

五、效果展示

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!