output操作概览
Output | Meaning |
| 打印每个batch中的前10个元素,主要用于测试,或者是不需要执行什么output操作时,用于简单触发一下job。 |
saveAsTextFile(prefix, [suffix]) | 将每个batch的数据保存到文件中。每个batch的文件的命名格式为:prefix-TIME_IN_MS[.suffix] |
saveAsObjectFile | 同上,但是将每个batch的数据以序列化对象的方式,保存到SequenceFile中。 |
saveAsHadoopFile | 同上,将数据保存到Hadoop文件中 |
foreachRDD | 最常用的output操作,遍历DStream中的每个产生的RDD,进行处理。可以将每个RDD中的数据写入外部存储,比如文件、数据库、缓存等。通常在其中,是针对RDD执行action操作的,比如foreach。 |
output操作
DStream中的所有计算,都是由output操作触发的,比如print()。如果没有任何output操作,那么,压根儿就不会执行定义的计算逻辑。
此外,即使你使用了foreachRDDoutput操作,也必须在里面对RDD执行action操作,才能触发对每一个batch的计算逻辑。否则,光有foreachRDD output操作,在里面没有对RDD执行action操作,也不会触发任何逻辑。
foreachRDD详解
通常在foreachRDD中,都会创建一个Connection,比如JDBC Connection,然后通过Connection将数据写入外部存储。
误区一:在RDD的foreach操作外部,创建Connection
这种方式是错误的,因为它会导致Connection对象被序列化后传输到每个Task中。而这种Connection对象,实际上一般是不支持序列化的,也就无法被传输。
- dstream.foreachRDD {rdd =>
- val connection = createNewConnection()
- rdd.foreach { record =>connection.send(record)
- }
- }
误区二:在RDD的foreach操作内部,创建Connection
这种方式是可以的,但是效率低下。因为它会导致对于RDD中的每一条数据,都创建一个Connection对象。而通常来说,Connection的创建,是很消耗性能的。
- dstream.foreachRDD {rdd =>
- rdd.foreach { record =>
- val connection = createNewConnection()
- connection.send(record)
- connection.close()
- }
- }
合理方式一:使用RDD的foreachPartition操作,并且在该操作内部,创建Connection对象,这样就相当于是,为RDD的每个partition创建一个Connection对象,节省资源的多了。
- dstream.foreachRDD {rdd =>
- rdd.foreachPartition { partitionOfRecords=>
- val connection = createNewConnection()
- partitionOfRecords.foreach(record =>connection.send(record))
- connection.close()
- }
- }
合理方式二:自己手动封装一个静态连接池,使用RDD的foreachPartition操作,并且在该操作内部,从静态连接池中,通过静态方法,获取到一个连接,使用之后再还回去。这样的话,甚至在多个RDD的partition之间,也可以复用连接了。而且可以让连接池采取懒创建的策略,并且空闲一段时间后,将其释放掉。
- dstream.foreachRDD {rdd =>
- rdd.foreachPartition { partitionOfRecords=>
- val connection = ConnectionPool.getConnection()
- partitionOfRecords.foreach(record =>connection.send(record))
- ConnectionPool.returnConnection(connection)
- }
- }
foreachRDD实战
案例:改写UpdateStateByKeyWordCount,将每次统计出来的全局的单词计数,写入一份,到MySQL数据库中。
建表语句
- create table wordcount(
- id integer auto_increment primary key,
- updated_time timestamp NOT NULL defaultCURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
- word varchar(255),
- count integer
- );
java版本代码:
- package cn.spark.study.sql;
-
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.Statement;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.api.java.function.PairFunction;
- import org.apache.spark.api.java.function.VoidFunction;
- import org.apache.spark.sql.DataFrame;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.RowFactory;
- import org.apache.spark.sql.SQLContext;
- import org.apache.spark.sql.types.DataTypes;
- import org.apache.spark.sql.types.StructField;
- import org.apache.spark.sql.types.StructType;
- import scala.Tuple2;
-
- /**
- * JDBC数据源
- * @author leizq120310
- *
- */
-
- public class JDBCDataSource {
-
- public static void main(String[] args) {
- SparkConf conf = new SparkConf()
- .setAppName("JDBCDataSource");
- JavaSparkContext sc = new JavaSparkContext(conf);
- SQLContext sqlContext = new SQLContext(sc);
-
- // 总结一下
- // jdbc数据源
- // 首先,是通过SQLContext的read系列方法,将mysql中的数据加载为DataFrame
- // 然后可以将DataFrame转换为RDD,使用Spark Core提供的各种算子进行操作
- // 最后可以将得到的数据结果,通过foreach()算子,写入mysql、hbase、redis等等db/ cache中
-
- // 分别将mysql中两张表的数据加载为DataFrame
- Map
options = new HashMap(); - options.put("url", "jdbc:mysql://spark1:3306/testdb");
- options.put("dbtable", "student_infos");
- DataFrame studentInfosDF = sqlContext.read().format("jdbc").options(options).load();
-
- options.clear();
- options.put("url", "jdbc:mysql://spark1:3306/testdb");
- options.put("dbtable", "student_scores");
- DataFrame studentScoresDF = sqlContext.read().format("jdbc").options(options).load();
-
- // 将两个DataFrame转换为JavaPairRDD,执行join操作
- JavaPairRDD
> studentsRDD = studentInfosDF.javaRDD().mapToPair(new PairFunction() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2
call(Row row) throws Exception { - return new Tuple2
- (row.getString(0), Integer.valueOf(String.valueOf(row.get(1))));
- }
- }).join(studentScoresDF.javaRDD().mapToPair(new PairFunction
() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2
call(Row row) throws Exception { - return new Tuple2
- (row.getString(0), Integer.valueOf(String.valueOf(row.get(1))));
- }
- }));
-
- // 将JavaPairRDD转换为JavaRDD
|
- JavaRDD
studentRowsRDD = studentsRDD.map(new Function>, Row>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Row call(Tuple2
> tuple) throws Exception { - return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2);
- }
- });
-
- // 过滤出分数大于80分的数据
- JavaRDD
filteredStudentRowsRDD = studentRowsRDD.filter(new Function() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Boolean call(Row row) throws Exception {
- if (row.getInt(2) > 80) {
- return true;
- }
- return false;
- }
- });
-
- // 转换为DataFrame
- List
structFields = new ArrayList(); - structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
- structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
- structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
- StructType structType = DataTypes.createStructType(structFields);
-
- DataFrame studentsDF = sqlContext.createDataFrame(filteredStudentRowsRDD, structType);
-
- Row[] rows = studentsDF.collect();
- for (Row row : rows)
- {
- System.out.println(row);
- }
-
- // 将DataFrame中的数据保存到mysql表中
- // 这种方式是在企业里很常用的,有可能是插入mysql、有可能是插入hbase,还有可能是插入redis缓存
- studentsDF.javaRDD().foreach(new VoidFunction
() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void call(Row row) throws Exception {
- String sql = "insert into good_student_infos values("
- + "'" + String.valueOf(row.getString(0)) + "',"
- + Integer.valueOf(String.valueOf(row.get(1))) + ","
- + Integer.valueOf(String.valueOf(row.get(2))) + ")";
- Class.forName("com.mysql.jdbc.Driver");
-
- Connection conn = null;
- Statement stmt = null;
- try{
- conn = DriverManager.getConnection(
- "jdbc:mysql://spark1:3306/testdb", "", "");
- stmt = conn.createStatement();
- stmt.executeUpdate(sql);
- } catch (Exception e) {
- e.printStackTrace();
- } finally{
- if (stmt != null){
- stmt.close();
- }
- if (conn != null){
- conn.close();
- }
- }
- }
- });
- sc.close();
- }
- }
操作步骤:
1.进入linux系统平台下的mysql操作中,创建wordcount表
文章最后,给大家推荐一些受欢迎的技术博客链接:
- Hadoop相关技术博客链接
- Spark 核心技术链接
- 超全干货--Flink思维导图,花了3周左右编写、校对
- 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
- 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
- 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂
欢迎扫描下方的二维码或 搜索 公众号“10点进修”,我们会有更多、且及时的资料推送给您,欢迎多多交流!
评论记录:
回复评论: