首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

PySpark和Kafka入门对比:快速理解大数据处理与实时消息传输

  • 25-04-17 12:20
  • 2754
  • 6693
juejin.cn

PySpark和Kafka是大数据领域常用的两种技术,但它们的定位和用途不同。理解它们的区别和适用场景,有助于选择合适的工具解决实际问题。下面用最简单的语言介绍它们的基础知识,并配上实用代码示例,帮助中国读者快速上手。

PySpark和Kafka的核心区别

方面PySparkKafka
定位大数据计算引擎,支持批处理和流处理分布式消息队列,专注于实时数据传输
数据处理方式微批处理(小批量数据处理)事件驱动,逐条消息实时处理
延迟毫秒到秒级,适合近实时和批量分析毫秒级,适合实时响应和事件驱动
适用场景复杂数据分析、机器学习、ETL实时日志收集、事件流传输、消息分发
生态系统支持多种数据源,支持Python、Scala等多语言主要做数据传输,支持Kafka Streams做简单流处理
部署复杂度需要Spark集群,资源管理较复杂部署简单,易扩展

适用业务场景

  • PySpark适合:

    • 需要对海量数据做复杂计算和分析,如批量ETL、机器学习训练。
    • 需要结合多种数据源进行联合分析。
    • 需要容错和高吞吐量的批处理和流处理。
  • Kafka适合:

    • 实时事件流传输,如网站点击流、传感器数据。
    • 构建实时数据管道,实现数据高效传输和分发。
    • 低延迟消息传递和事件驱动架构。
    • 日志聚合和实时监控。

PySpark基础示例:读取CSV文件,过滤和统计数据

python
代码解读
复制代码
from pyspark.sql import SparkSession # 创建SparkSession,所有Spark操作的入口 spark = SparkSession.builder.appName("PySparkExample").getOrCreate() # 读取CSV文件,自动推断数据类型,第一行为表头 df = spark.read.csv("example_data.csv", header=True, inferSchema=True) # 显示前5行数据,方便查看 df.show(5) # 过滤出Quantity(数量)大于10的记录 filtered_df = df.filter(df.Quantity > 10) # 按Country(国家)分组,统计订单数量 result = filtered_df.groupBy("Country").count() # 显示统计结果 result.show() # 关闭SparkSession,释放资源 spark.stop()
  • 这个示例展示了如何用PySpark读取数据、过滤和聚合,适合批处理和近实时分析
  • 适合处理百万级以上数据,Spark会自动分布式计算。

Kafka基础示例:创建主题,发送和接收消息(命令行操作)

  1. 启动Kafka和Zookeeper服务(Kafka依赖Zookeeper管理集群)
bash
代码解读
复制代码
# 启动Zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties # 启动Kafka服务器 bin/kafka-server-start.sh config/server.properties
  1. 创建Kafka主题
bash
代码解读
复制代码
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092
  1. 启动生产者,发送消息
bash
代码解读
复制代码
bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
  • 输入消息,按回车发送。
  1. 启动消费者,接收消息
bash
代码解读
复制代码
bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server localhost:9092
  • 消费者会实时显示生产者发送的消息。

这个流程展示了Kafka作为消息队列的基本用法,适合实时事件传输和消息系统

PySpark与Kafka结合示例:用PySpark读取Kafka实时数据流

python
代码解读
复制代码
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("KafkaSparkExample") \ .getOrCreate() # 从Kafka读取数据,设置Kafka服务器地址和主题 df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "test-topic") \ .option("startingOffsets", "earliest") \ .load() # Kafka数据的value是二进制,需要转换成字符串 from pyspark.sql.functions import col, expr df_string = df.selectExpr("CAST(value AS STRING)") # 简单打印输出流数据 query = df_string.writeStream \ .outputMode("append") \ .format("console") \ .start() query.awaitTermination()
  • 这个示例展示了如何用PySpark实时读取Kafka消息,适合实时数据分析和处理
  • 需要先启动Kafka服务和创建主题。

PySpark常用操作扩展示例

1. 词频统计(Word Count)

python
代码解读
复制代码
text_file = spark.read.text("large_text.txt") words = text_file.rdd.flatMap(lambda line: line.value.split()) word_counts = words.countByValue() for word, count in word_counts.items(): print(f"{word}: {count}")

2. 数据聚合统计

python
代码解读
复制代码
from pyspark.sql.functions import sum sales_data = spark.read.csv("sales.csv", header=True, inferSchema=True) sales_by_category = sales_data.groupBy("category").agg(sum("amount").alias("total_sales")) sales_by_category.show()

3. 机器学习示例:线性回归

python
代码解读
复制代码
from pyspark.ml.feature import VectorAssembler from pyspark.ml.regression import LinearRegression data = spark.read.csv("data.csv", header=True, inferSchema=True) assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features") data = assembler.transform(data).select("features", "label") train_data, test_data = data.randomSplit([0.7, 0.3]) lr = LinearRegression(featuresCol="features", labelCol="label") model = lr.fit(train_data) predictions = model.transform(test_data) predictions.show()

总结

  • PySpark是一个强大的大数据计算引擎,适合复杂数据分析、机器学习和批处理,支持多语言,适合处理海量数据。
  • Kafka是一个高吞吐量、低延迟的分布式消息队列,适合实时数据传输和事件驱动架构。
  • 两者常结合使用:Kafka负责实时数据传输,PySpark负责复杂数据处理,形成完整的数据处理生态。
注:本文转载自juejin.cn的程序员小jobleap的文章"https://juejin.cn/post/7493456655842066444"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2491) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

109
人工智能
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top