【Python】数据处理(mongodb、布隆过滤器、索引)

avatar
作者
筋斗云
阅读量:4

数据
在这里插入图片描述

数据预处理

df = pd.read_csv(file_path, encoding='ANSI')
csv的编码方式一定要用 ANSI。要不然会出现各种报错

import pandas as pd from datetime import datetime  # 读取CSV文件 file_path = 'book_douban.csv' df = pd.read_csv(file_path, encoding='ANSI')   # 定义一个函数来提取年份 def extract_year(publish_date):     publish_date= str(publish_date).strip()     try:         # 尝试不同的日期格式         for fmt in ('%Y/%m', '%Y/%m/%d', '%Y年%m月', '%Y'):             try:                 return int(datetime.strptime(publish_date, fmt).year)             except ValueError:                 pass     except TypeError:         return None   # 应用函数提取年份 df['出版时间'] = df['出版时间'].apply(extract_year)  # 清洗数据,删除评分空值 df = df[df['评分'] != 0] df = df[df['书名'] != '不存在未出版的错误条目'] df = df[df['书名'] != '点击上传封面图片']  # 左右两边去除空格 for column in df.columns:     if column == "书名" or column == "出版社" or column == "作者":         df[column] = df[column].str.strip()  # 去除重复行。除了第一列id不一样,其他均一样 (806和807行) # df = df.drop_duplicates(keep=False) df = df.drop_duplicates(subset=df.columns[1:], keep='first')  # 将清洗后的数据写入新的CSV文件 output_file_path = 'cleaned_books.csv' df.to_csv(output_file_path, index=False, encoding='ANSI')  print(f"数据清洗完成,并写入新文件:{output_file_path}") 

布隆过滤器

由于相同的书籍可能出于多个出版社,为避免书籍评分重复导入,实现BF过滤器,每当有一部书籍被存储后将其加入BF过滤器,并能够使用BF过滤器查询上述书籍是否已经被存储。

import hashlib import math import mmh3  # 使用 mmh3 库作为哈希函数 import random  class SimpleBloomFilter:     def __init__(self, items_count, fp_prob):         # 计算所需的位数组的大小和哈希函数的数量         self.size = self.get_size(items_count, fp_prob)         self.hash_count = self.get_hash_count(self.size, items_count)         self.bit_array = [0] * self.size      def add(self, item):         # 添加元素到布隆过滤器         digests = []         for i in range(self.hash_count):             # 使用不同的种子生成不同的哈希值             digest = mmh3.hash(item, i) % self.size             digests.append(digest)             self.bit_array[digest] = 1      def check(self, item):         # 检查元素是否可能存在于布隆过滤器         for i in range(self.hash_count):             digest = mmh3.hash(item, i) % self.size             if self.bit_array[digest] == 0:                 # 如果有一个位是0,则元素肯定不在集合中                 return False         return True      @classmethod     def get_size(self, n, p):         # 计算位数组的大小         m = -(n * math.log(p)) / (math.log(2) ** 2)         return int(m)      @classmethod     def get_hash_count(self, m, n):         # 计算哈希函数的数量         k = (m / n) * math.log(2)         return int(k) 

布隆过滤器原理

https://blog.csdn.net/qq_41125219/article/details/119982158
布隆过滤器实际上是一个很长的二进制向量和一系列随机映射函数。可以用于解决Redis缓存穿透问题

Redis中的布隆过滤器底层是一个大型位数组(二进制数组)+多个无偏hash函数。无偏hash函数就是能把元素的hash值计算的比较均匀的hash函数,能使得计算后的元素下标比较均匀的映射到位数组中。
如下就是一个简单的布隆过滤器示意图,其中k1、k2代表增加的元素,a、b、c即为无偏hash函数,最下层则为二进制数组。
在这里插入图片描述

在布隆过滤器增加元素之前,首先需要初始化布隆过滤器的空间,也就是上面说的二进制数组,除此之外还需要计算无偏hash函数的个数。布隆过滤器提供了两个参数,分别是预计加入元素的大小n,运行的错误率f。布隆过滤器中有算法根据这两个参数会计算出二进制数组的大小l,以及无偏hash函数的个数k。

添加元素
往布隆过滤器增加元素,添加的key需要根据k个无偏hash函数计算得到多个hash值,然后对数组长度进行取模得到数组下标的位置,然后将对应数组下标的位置的值置为1

  • 通过k个无偏hash函数计算得到k个hash值
  • 依次取模数组长度,得到数组索引
  • 将计算得到的数组索引下标位置数据修改为1
    在这里插入图片描述

查询元素

  • 通过k个无偏hash函数计算得到k个hash值
  • 依次取模数组长度,得到数组索引
  • 判断索引处的值是否全部为1,如果全部为1则存在(这种存在可能是误判),如果存在一个0则必定不存在

误报是由于hash冲突

布隆过滤器的缺点:

  • 有点一定的误判率,但是可以通过调整参数来降低
  • 无法获取元素本身
  • 很难删除元素

数据写入mongodb以及数据合并

mongodb下载
https://www.mongodb.com/try/download/community

mongodb客户端工具MongoDB Compass下载
https://www.mongodb.com/try/download/compass

import pymongo import pandas as pd from pybloom_live import BloomFilter import re  # 连接MongoDB from SimpleBloomFilter import SimpleBloomFilter  client = pymongo.MongoClient("mongodb://localhost:27017/") db = client["book_database"] collection = db["books"]  # 使用布隆过滤器 items_count = 60000  # 预计存储的元素数量 fp_prob = 0.001  # 可接受的错误率 bf = SimpleBloomFilter(items_count, fp_prob)  # 读取CSV文件 file_path = 'cleaned_books.csv' df = pd.read_csv(file_path, encoding='ANSI')  # 准备数据,准备存储到MongoDB data_to_store = df.to_dict(orient='records')   # 这个作者数据太脏了,是一个作者,但是名字写法不同 def is_same_author(name1, name2):     # 去除字符串中的括号和特殊字符      name1 = re.sub(r'\[.*?\]', '', name1)  # 去除类似 [法] 这样的内容     name2 = re.sub(r'\[.*?\]', '', name2)      name1 = re.sub(r'\(.*?\)', '', name1)     name2 = re.sub(r'\(.*?\)', '', name2)      name1 = re.sub(r'(.*?)', '', name1)  # 去除类似 [法] 这样的内容     name2 = re.sub(r'(.*?)', '', name2)      # 将字符串中的英文和数字转换为小写     name1 = re.sub(r'[A-Za-z0-9]+', lambda x: x.group().lower(), name1)     name2 = re.sub(r'[A-Za-z0-9]+', lambda x: x.group().lower(), name2)      # 对中文进行分词(这里简化处理,实际情况可能需要更复杂的分词算法)     # 假设我们只取中文名字的第一部分作为代表     name1_chinese = re.search(r'[\u4e00-\u9fa5]+', name1)     name2_chinese = re.search(r'[\u4e00-\u9fa5]+', name2)      # 如果两个名字都有中文部分,比较中文部分     if name1_chinese and name2_chinese:         name1_standard = name1_chinese.group()         name2_standard = name2_chinese.group()     else:         # 如果没有中文部分,比较整个字符串(已去除英文大小写和括号的影响)         name1_standard = name1         name2_standard = name2      # 比较两个标准化后的字符串     return name1_standard == name2_standard   for book in data_to_store:     # 检查布隆过滤器,确定这本书是否已经存储      # 以 '书名+作者' 为唯一标识。  有的书名一样,但是作者不一样。  比如  “系统神学”   (但是有的书作者为空,不好之前将书名和作者拼接放到布隆过滤器中去)     if bf.check(book['书名']):          query = {'书名': book['书名']}         search_result = collection.find(query)            # 打印查询结果         for _book in search_result:             if pd.isna(book['作者']) or pd.isna(_book['作者']) or is_same_author(book['作者'], _book['作者']):                  if isinstance(_book.get('出版社', ''), str) and pd.isna(_book.get('出版社', '')):                     existing_publishers = book['出版社']                 else:                     if isinstance(_book.get('出版社', ''), str) and _book.get('出版社', '') != book['出版社']:                         existing_publishers = [_book.get('出版社', ''), book['出版社']]                      elif isinstance(_book.get('出版社', ''), list):                         # 如果已经是列表,则直接使用                         existing_publishers = _book.get('出版社', [])                         if book['出版社'] not in existing_publishers:                             # print(_book.get('出版社', ''), book['书名'])                             existing_publishers.append(book['出版社'])                  existing_publish_time = _book.get('出版时间', '')                 if not pd.isna(book['出版时间']):                     if isinstance(_book.get('出版时间', ''), float) and _book.get('出版时间', '') != book['出版时间']:                         if not pd.isna(_book.get('出版时间', '')):                             existing_publish_time = [_book.get('出版时间', ''), book['出版时间']]                         else:                             existing_publish_time = book['出版时间']                     elif isinstance(_book.get('出版时间', ''), list):                         # 如果已经是列表,则直接使用                         existing_publish_time = _book.get('出版时间', [])                         if book['出版时间'] not in existing_publish_time:                             existing_publish_time.append(book['出版时间'])                  new_rating = (float(_book.get('评分', '')) * float(_book.get('评论数量', '')) + float(book['评分']) * float(                     book['评论数量'])) / (float(_book.get('评论数量', '')) + float(book['评论数量']))                  print(existing_publishers,existing_publish_time,book['书名'])                  # 如果书已存在,更新出版社列表、评分和评论数量                 update_result = collection.update_one(                     {'书名': book['书名']},                     {                         "$set": {                             "出版社": existing_publishers,                             "出版时间": existing_publish_time,                             "评分": new_rating  # 确保 new_rating 是计算后的新评分值                         },                         "$inc": {                             "评论数量": book['评论数量']                         }                     },                     upsert=False                 )                 if update_result.modified_count == 0:                     # 如果没有修改,说明publishers已经包含在数组中,可以进行去重操作                     pass                  # 只能有一个                 break     else:         # 如果书不存在,添加到数据库和布隆过滤器         collection.insert_one(book)         bf.add(book['书名'])  # 关闭MongoDB连接 client.close()  

数据添加索引

import csv  import pandas as pd   class BTreeNode:     def __init__(self, is_leaf=False):         self.keys = []         self.children = []         self.is_leaf = is_leaf  class BTree:     def __init__(self, t):         self.root = BTreeNode(is_leaf=True)         self.t = t  # Minimum degree      def insert(self, publication_time, rating_count, book_info):         root = self.root         if len(root.keys) == (2 * self.t) - 1:             new_root = BTreeNode()             self.root = new_root             new_root.children.append(root)             self.split_child(new_root, 0)         self.insert_non_full(self.root, publication_time, rating_count, book_info)      def insert_non_full(self, node, publication_time, rating_count, book_info):         i = len(node.keys) - 1         if node.is_leaf:             node.keys.append((publication_time, rating_count, book_info))             while i >= 0 and publication_time < node.keys[i][0]:                 node.keys[i + 1] = node.keys[i]                 i -= 1             node.keys[i + 1] = (publication_time, rating_count, book_info)         else:             while i >= 0 and publication_time < node.keys[i][0]:                 i -= 1             i += 1             if len(node.children[i].keys) == (2 * self.t) - 1:                 self.split_child(node, i)                 if publication_time > node.keys[i][0]:                     i += 1             self.insert_non_full(node.children[i], publication_time, rating_count, book_info)      def split_child(self, parent, index):         child = parent.children[index]         new_child = BTreeNode(is_leaf=child.is_leaf)         parent.keys.insert(index, child.keys[self.t - 1])         parent.children.insert(index + 1, new_child)         new_child.keys = child.keys[self.t:]         child.keys = child.keys[:self.t - 1]         if not child.is_leaf:             new_child.children = child.children[self.t:]             child.children = child.children[:self.t]      def range_query(self, start_time, end_time, min_rating_count):         result = []         self.range_query_helper(self.root, start_time, end_time, min_rating_count, result)         return result      def range_query_helper(self, node, start_time, end_time, min_rating_count, result):         if node.is_leaf:             for key in node.keys:                 if start_time <= key[0] <= end_time and key[1] > min_rating_count:                     result.append(key)             return         for i in range(len(node.keys)):             if start_time <= node.keys[i][0]:                 self.range_query_helper(node.children[i], start_time, end_time, min_rating_count, result)         self.range_query_helper(node.children[-1], start_time, end_time, min_rating_count, result)  def read_csv_and_build_index(file_path, btree, t):     with open(file_path, 'r',encoding='ANSI') as file:         csv_reader = csv.reader(file)         next(csv_reader)  # 跳过表头         for row in csv_reader:             if row[4] and row[6] and not pd.isnull(row[4]) and not pd.isna(row[6]):                 publication_time = int(float(row[4]))                 rating_count = float(row[6])                 book_info = row  # 保存整行信息                 btree.insert(publication_time, rating_count, book_info)  # 测试代码 btree = BTree(2)  # 设置 t 的值  file_path = 'cleaned_books.csv'  # 替换为实际的文件路径 read_csv_and_build_index(file_path, btree, 2)  # start_time = 2000 # end_time = 2015 # min_rating_count = 50  start_time = int(input("请输入出版起始时间:")) end_time = int(input("请输入出版终止时间:")) min_rating_count = int(input("请输入最低评论数:"))  query_result = btree.range_query(start_time, end_time, min_rating_count) for item in query_result:     print(" ".join(item[2]))   

生成环境配置文件requirements.txt

导出整个环境的安装包

pip freeze > requirements.txt 

导出单个项目安装包

pip install pipreqs pipreqs . 

安装依赖

pip install -r requirements.txt -i https://pypi.douban.com/simple 

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!