hw12-impl
本次没有书面要求, 原始代码+在答辩时展示spark进行书籍统计计算即可。
统计的示例代码如下(依赖: pyspark)
import re
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, count
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from typing import List, Tuple
if __name__ == "__main__":
# 初始化 SparkSession
spark = SparkSession.builder \
.appName("Keyword Count") \
.getOrCreate()
# 定义输入文件的路径
input_path = "infos"
keyword_path = "keywords.txt"
keywords = []
with open(keyword_path, "r") as f:
keyword_lines = [l.strip() for l in f.readlines() if l.strip() != ""]
for l in keyword_lines:
words = [w.strip() for w in l.split(',') if w.strip() != ""]
keywords.extend(words)
print(f"keywords: {keywords}")
# 读取文件夹下的所有 .txt 文件
def input_transform(file_path: str) -> Tuple[str, List[str]]:
with open(file_path, "r") as f:
content = [l.strip() for l in f.readlines() if l.strip() != ""]
results: Tuple[str, List[str]] = (file_path, [])
for line in content:
parts = line.split(":")
if len(parts) == 2:
words = [w for w in re.split(r"\W+", parts[1]) if w in keywords]
results[1].extend(words)
return results
file_paths = [f"{input_path}/{f}" for f in os.listdir(input_path) if f.endswith(".txt")]
data:List[Tuple[str, List[str]]] = [input_transform(f) for f in file_paths]
schema = StructType([
StructField("book", StringType()),
StructField("words", ArrayType(StringType())),
])
df = spark.createDataFrame(data=data, schema=schema)
df = df.withColumn("words", explode("words")) # 将数组展开为多行,每行一个单词
keyword_counts = df.groupBy("words").count().orderBy("count", ascending=False)
# 显示结果
keyword_counts.show()
# 停止 SparkSession
spark.stop()