阅读量:5
文章目录
发现宝藏
前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。【宝藏入口】。
一、 目标
爬取https://news.berkeley.edu/
的字段,包含标题、内容,作者,发布时间,链接地址,文章快照 (可能需要翻墙才能访问)
二、简单分析网页
1. 寻找所有新闻
2. 分析模块、版面和文章
我们可以按照新闻模块、版面、和文章对网页信息进行拆分,分别按照步骤进行爬取
三、爬取新闻
1. 爬取模块
由于该新闻只有一个模块,所以直接请求该模块地址即可获取该模块的所有信息,但是为了兼容多模块的新闻,我们还是定义一个数组存储模块地址
class MitnewsScraper: def __init__(self, root_url, model_url, img_output_dir): self.root_url = root_url self.model_url = model_url self.img_output_dir = img_output_dir self.headers = { 'Referer': 'https://news.berkeley.edu/', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/122.0.0.0 Safari/537.36', 'Cookie': '替换成你自己的', } ... def run(): # 根路径 root_url = 'https://news.berkeley.edu/' # 模块地址数组 model_urls = ['https://news.berkeley.edu/news'] # 文章图片保存路径 output_dir = 'D://imgs//berkeley-news' for model_url in model_urls: scraper = MitnewsScraper(root_url, model_url, output_dir) scraper.catalogue_all_pages() if __name__ == "__main__": run()
多模块的新闻网站例子如下(4个模块)
2. 爬取版面
- f12打开控制台,点击网络(network),通过切换页面观察接口的参数传递,发现只有一个page参数
- 于是我们可以获取页面下面的页数(page x of xxxx), 然后进行遍历传参,也就遍历获取了所有版面
# 获取一个模块有多少版面 def catalogue_all_pages(self): response = requests.get(self.model_url, headers=self.headers) soup = BeautifulSoup(response.text, 'html.parser') try: match = re.search(r'of (\d+)', soup.text) num_pages = int(match.group(1)) print('模块一共有' + str(num_pages) + '页版面,') for page in range(1, num_pages + 1): self.parse_catalogues(page) print(f"========Finished modeles page {page}========") except: return False
- F12打开控制台后按照如下步骤获取版面列表对应的dom结构
catalogue_list = soup.find('div', 'filtered-items') catalogues_list = catalogue_list.find_all('article')
- 遍历版面列表,获取版面标题
for index, catalogue in enumerate(catalogues_list): # 版面标题 catalogue_title = catalogue.find('div', 'news-item__description').find('a').get_text(strip=True) print('第' + str(index + 1) + '个版面标题为:' + catalogue_title)
- 获取版面更新时间和当下的操作时间
# 操作时间 date = datetime.now() # 更新时间 publish_time = catalogue.find('div', 'news-item__description').find('time').get('datetime') # 将日期字符串转换为datetime对象 updatetime = datetime.strptime(publish_time, '%Y-%m-%d')
- 保存版面url和版面id, 由于该新闻是一个版面对应一篇文章,所以版面url和文章url是一样的,而且文章没有明显的标识,我们把地址后缀作为文章id,版面id则是文章id后面加上个01, 为了避免标题重复也可以把日期前缀也加上去
# 版面url catalogue_href = catalogue.find('div', 'news-item__description').find('a').get('href') catalogue_url = self.root_url + catalogue_href # 版面id catalogue_id = catalogue_href[1:] print('第' + str(index + 1) + '个版面地址为:' + catalogue_url)
- 保存版面信息到mogodb数据库(由于每个版面只有一篇文章,所以版面文章数量cardsize的值赋为1)
# 连接 MongoDB 数据库服务器 client = MongoClient('mongodb://localhost:27017/') # 创建或选择数据库 db = client['berkeley-news'] # 创建或选择集合 catalogues_collection = db['catalogues'] # 插入示例数据到 catalogues 集合 catalogue_data = { 'id': catalogue_id + '01', 'date': date, 'title': catalogue_title, 'url': catalogue_url, 'cardSize': 1, 'updatetime': updatetime }
3. 爬取文章
由于一个版面对应一篇文章,所以版面url 、更新时间、标题和文章是一样的,并且按照设计版面id和文章id的区别只是差了个01,所以可以传递版面url、版面id、更新时间和标题四个参数到解析文章的函数里面
获取文章id,文章url,文章更新时间和当下操作时间
# 解析版面 def parse_catalogues(self, page): ... self.parse_cards_list(catalogue_url, catalogue_id, updatetime, catalogue_title) ... # 解析文章 def parse_cards_list(self, url, catalogue_id, updatetime, cardtitle): card_response = requests.get(url, headers=self.headers) soup = BeautifulSoup(card_response.text, 'html.parser')
- 获取文章作者
# 文章作者 author = soup.find('a', href='/author/news').get_text()
- 获取文章原始htmldom结构,并删除无用的部分(以下仅是部分举例),用html_content字段保留原始dom结构
# 原始htmldom结构 html_dom = soup.find('div', 'single-post cb-section cb-stretch') # 标题上方的冗余 html_cut1 = html_dom.find('div', 'single-post__heading').find('strong') # 链接冗余 html_cut2 = html_dom.find_all('a', 'a2a_dd share-link') # 移除元素 if html_cut1: html_cut1.extract() if html_cut2: for item in html_cut2: item.extract() html_content = html_dom
- 进行文章清洗,保留文本,去除标签,用content保留清洗后的文本
# 解析文章列表里的文章 def parse_cards_list(self, url, catalogue_id, cardupdatetime, cardtitle): ... # 增加保留html样式的源文本 origin_html = html_dom.prettify() # String # 转义网页中的图片标签 str_html = self.transcoding_tags(origin_html) # 再包装成 temp_soup = BeautifulSoup(str_html, 'html.parser') # 反转译文件中的插图 str_html = self.translate_tags(temp_soup.text) # 绑定更新内容 content = self.clean_content(str_html) # 工具 转义标签 def transcoding_tags(self, htmlstr): re_img = re.compile(r'\s*<(img.*?)>\s*', re.M) s = re_img.sub(r'\n @@##\1##@@ \n', htmlstr) # IMG 转义 return s # 工具 转义标签 def translate_tags(self, htmlstr): re_img = re.compile(r'@@##(img.*?)##@@', re.M) s = re_img.sub(r'<\1>', htmlstr) # IMG 转义 return s # 清洗文章 def clean_content(self, content): if content is not None: content = re.sub(r'\r', r'\n', content) content = re.sub(r'\n{2,}', '', content) content = re.sub(r' {6,}', '', content) content = re.sub(r' {3,}\n', '', content) content = re.sub(r'<img src="../../../image/zxbl.gif"/>', '', content) content = content.replace( '<img border="0" src="****处理标记:[Article]时, 字段 [SnapUrl] 在数据源中没有找到! ****"/> ', '') content = content.replace( ''' <!--/enpcontent<INPUT type=checkbox value=0 name=titlecheckbox sourceid="<Source>SourcePh " style="display:none">''', '') \ .replace(' <!--enpcontent', '').replace('<TABLE>', '') content = content.replace('<P>', '').replace('<\P>', '').replace(' ', ' ') return content
- 下载保存图片
def parse_cards_list(self, url, catalogue_id, cardupdatetime, cardtitle): ... imgs = [] img_array = soup.find('figure', 'cb-image cb-float--none cb-float--none--md cb-float--none--lg cb-100w cb-100w--md cb-100w--lg new-figure').find_all('img') for item in img_array: img_url = item.get('src') imgs.append(img_url) if len(imgs) != 0: # 下载图片 illustrations = self.download_images(imgs, card_id) # 下载图片 def download_images(self, img_urls, card_id): result = re.search(r'[^/]+$', card_id) last_word = result.group(0) # 根据card_id创建一个新的子目录 images_dir = os.path.join(self.img_output_dir, str(last_word)) if not os.path.exists(images_dir): os.makedirs(images_dir) downloaded_images = [] for index, img_url in enumerate(img_urls): try: response = requests.get(img_url, stream=True, headers=self.headers) if response.status_code == 200: # 从URL中提取图片文件名 img_name_with_extension = img_url.split('/')[-1] pattern = r'^[^?]*' match = re.search(pattern, img_name_with_extension) img_name = match.group(0) # 保存图片 with open(os.path.join(images_dir, img_name), 'wb') as f: f.write(response.content) downloaded_images.append([img_url, os.path.join(images_dir, img_name)]) except requests.exceptions.RequestException as e: print(f'请求图片时发生错误:{e}') except Exception as e: print(f'保存图片时发生错误:{e}') return downloaded_images # 如果文件夹存在则跳过 else: print(f'文章id为{card_id}的图片文件夹已经存在') return []
- 保存文章数据
# 连接 MongoDB 数据库服务器 client = MongoClient('mongodb://localhost:27017/') # 创建或选择数据库 db = client['berkeley-news'] # 创建或选择集合 cards_collection = db['cards'] # 插入示例数据到 catalogues 集合 card_data = { 'id': card_id, 'catalogueId': catalogue_id, 'type': 'berkeley-news', 'date': date, 'title': card_title, 'author': author, 'updatetime': updateTime, 'url': url, 'html_content': str(html_content), 'content': content, 'illustrations': illustrations, } cards_collection.insert_one(card_data)
四、完整代码
import os from datetime import datetime import requests from bs4 import BeautifulSoup from pymongo import MongoClient import re import traceback class MitnewsScraper: def __init__(self, root_url, model_url, img_output_dir): self.root_url = root_url self.model_url = model_url self.img_output_dir = img_output_dir self.headers = { 'Referer': 'https://news.berkeley.edu/', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/122.0.0.0 Safari/537.36', 'Cookie': '替换成你自己的', } # 获取一个模块有多少版面 def catalogue_all_pages(self): response = requests.get(self.model_url, headers=self.headers) soup = BeautifulSoup(response.text, 'html.parser') try: match = re.search(r'of (\d+)', soup.text) num_pages = int(match.group(1)) print('模块一共有' + str(num_pages) + '页版面') for page in range(1, num_pages + 1): print(f"========start catalogues page {page}" + "/" + str(num_pages) + "========") self.parse_catalogues(page) print(f"========Finished catalogues page {page}" + "/" + str(num_pages) + "========") except Exception as e: print(f'Error: {e}') traceback.print_exc() # 解析版面列表里的版面 def parse_catalogues(self, page): params = {'page': page} response = requests.get(self.model_url, params=params, headers=self.headers) if response.status_code == 200: soup = BeautifulSoup(response.text, 'html.parser') catalogue_list = soup.find('div', 'filtered-items') catalogues_list = catalogue_list.find_all('article') for index, catalogue in enumerate(catalogues_list): print(f"========start catalogue {index+1}" + "/" + "10========") # 版面标题 catalogue_title = catalogue.find('div', 'news-item__description').find('a').get_text(strip=True) # 操作时间 date = datetime.now() # 更新时间 publish_time = catalogue.find('div', 'news-item__description').find('time').get('datetime') # 将日期字符串转换为datetime对象 updatetime = datetime.strptime(publish_time, '%Y-%m-%d') # 版面url catalogue_href = catalogue.find('div', 'news-item__description').find('a').get('href') catalogue_url = self.root_url + catalogue_href # 版面id catalogue_id = catalogue_href[1:] self.parse_cards_list(catalogue_url, catalogue_id, updatetime, catalogue_title) # 连接 MongoDB 数据库服务器 client = MongoClient('mongodb://localhost:27017/') # 创建或选择数据库 db = client['berkeley-news'] # 创建或选择集合 catalogues_collection = db['catalogues'] # 插入示例数据到 catalogues 集合 catalogue_data = { 'id': catalogue_id, 'date': date, 'title': catalogue_title, 'url': catalogue_url, 'cardSize': 1, 'updatetime': updatetime } # 在插入前检查是否存在相同id的文档 existing_document = catalogues_collection.find_one({'id': catalogue_id}) # 如果不存在相同id的文档,则插入新文档 if existing_document is None: catalogues_collection.insert_one(catalogue_data) print("[爬取版面]版面 " + catalogue_url + " 已成功插入!") else: print("[爬取版面]版面 " + catalogue_url + " 已存在!") print(f"========finsh catalogue {index+1}" + "/" + "10========") return True else: raise Exception(f"Failed to fetch page {page}. Status code: {response.status_code}") # 解析文章列表里的文章 def parse_cards_list(self, url, catalogue_id, cardupdatetime, cardtitle): url = 'https://news.berkeley.edu/2024/03/05/meet-our-new-faculty-antoine-levy-economics' card_response = requests.get(url, headers=self.headers) soup = BeautifulSoup(card_response.text, 'html.parser') # 对应的版面id card_id = catalogue_id # 文章标题 card_title = cardtitle # 文章更新时间 updateTime = cardupdatetime # 操作时间 date = datetime.now() # 文章作者 try: author = soup.find('a', href='/author/news').get_text() except: author = soup.find('div', 'single-post__heading').find('p').find('a').get_text() # 原始htmldom结构 html_dom = soup.find('div', 'single-post cb-section cb-stretch') # 标题上方的冗余 html_cut1 = html_dom.find('div', 'single-post__heading').find('strong') # 链接冗余 html_cut2 = html_dom.find_all('a', 'a2a_dd share-link') # 移除元素 if html_cut1: html_cut1.extract() if html_cut2: for item in html_cut2: item.extract() html_content = html_dom # 增加保留html样式的源文本 origin_html = html_dom.prettify() # String # 转义网页中的图片标签 str_html = self.transcoding_tags(origin_html) # 再包装成 temp_soup = BeautifulSoup(str_html, 'html.parser') # 反转译文件中的插图 str_html = self.translate_tags(temp_soup.text) # 绑定更新内容 content = self.clean_content(str_html) # 下载图片 imgs = [] try: img_array = soup.find('figure', 'cb-image cb-float--none cb-float--none--md cb-float--none--lg cb-100w cb-100w--md cb-100w--lg new-figure').find_all('img') except: img_array = soup.find('div', 'container container--lg cb-container').find_all('img') if len(img_array) is not None: for item in img_array: img_url = item.get('src') if img_url is None: img_url = item.get('data-src') imgs.append(img_url) if len(imgs) != 0: # 下载图片 illustrations = self.download_images(imgs, card_id) # 连接 MongoDB 数据库服务器 client = MongoClient('mongodb://localhost:27017/') # 创建或选择数据库 db = client['berkeley-news'] # 创建或选择集合 cards_collection = db['cards'] # 插入示例数据到 cards 集合 card_data = { 'id': card_id, 'catalogueId': catalogue_id, 'type': 'berkeley-news', 'date': date, 'title': card_title, 'author': author, 'updatetime': updateTime, 'url': url, 'html_content': str(html_content), 'content': content, 'illustrations': illustrations, } # 在插入前检查是否存在相同id的文档 existing_document = cards_collection.find_one({'id': card_id}) # 如果不存在相同id的文档,则插入新文档 if existing_document is None: cards_collection.insert_one(card_data) print("[爬取文章]文章 " + url + " 已成功插入!") else: print("[爬取文章]文章 " + url + " 已存在!") # 下载图片 def download_images(self, img_urls, card_id): result = re.search(r'[^/]+$', card_id) last_word = result.group(0) # 根据card_id创建一个新的子目录 images_dir = os.path.join(self.img_output_dir, str(last_word)) if not os.path.exists(images_dir): os.makedirs(images_dir) downloaded_images = [] for index, img_url in enumerate(img_urls): try: response = requests.get(img_url, stream=True, headers=self.headers) if response.status_code == 200: # 从URL中提取图片文件名 img_name_with_extension = img_url.split('/')[-1] pattern = r'^[^?]*' match = re.search(pattern, img_name_with_extension) img_name = match.group(0) # 保存图片 with open(os.path.join(images_dir, img_name), 'wb') as f: f.write(response.content) downloaded_images.append([img_url, os.path.join(images_dir, img_name)]) print(f'[爬取文章图片]文章id为{card_id}的图片已保存到本地') except requests.exceptions.RequestException as e: print(f'请求图片时发生错误:{e}') except Exception as e: print(f'保存图片时发生错误:{e}') return downloaded_images # 如果文件夹存在则跳过 else: print(f'[爬取文章图片]文章id为{card_id}的图片文件夹已经存在') return [] # 工具 转义标签 def transcoding_tags(self, htmlstr): re_img = re.compile(r'\s*<(img.*?)>\s*', re.M) s = re_img.sub(r'\n @@##\1##@@ \n', htmlstr) # IMG 转义 return s # 工具 转义标签 def translate_tags(self, htmlstr): re_img = re.compile(r'@@##(img.*?)##@@', re.M) s = re_img.sub(r'<\1>', htmlstr) # IMG 转义 return s # 清洗文章 def clean_content(self, content): if content is not None: content = re.sub(r'\r', r'\n', content) content = re.sub(r'\n{2,}', '', content) content = re.sub(r' {6,}', '', content) content = re.sub(r' {3,}\n', '', content) content = re.sub(r'<img src="../../../image/zxbl.gif"/>', '', content) content = content.replace( '<img border="0" src="****处理标记:[Article]时, 字段 [SnapUrl] 在数据源中没有找到! ****"/> ', '') content = content.replace( ''' <!--/enpcontent<INPUT type=checkbox value=0 name=titlecheckbox sourceid="<Source>SourcePh " style="display:none">''', '') \ .replace(' <!--enpcontent', '').replace('<TABLE>', '') content = content.replace('<P>', '').replace('<\P>', '').replace(' ', ' ') return content def run(): # 根路径 root_url = 'https://news.berkeley.edu/' # 模块地址数组 model_urls = ['https://news.berkeley.edu/news'] # 文章图片保存路径 output_dir = 'D://imgs//berkeley-news' for model_url in model_urls: scraper = MitnewsScraper(root_url, model_url, output_dir) scraper.catalogue_all_pages() if __name__ == "__main__": run()