Spark的shell可以方便我们去学习API,也是一个强有力的数据分析交互式工具。在spark根目录下运行下面代码:
Spark的主要抽象是一种被称为Dataset的分布式集合。你可以从Hadoop InputFormats(如HDFS文件)创建Datasets,或者从其它Datasets转化(transform)。由于Python动态的本质,我们不需要Dataset是个强类型的。所以,在Python中,所有的Datasets都是Dataset[Row],我们称之为DataFrame,在Pandas和R中也这样称呼。现在,在打开的shell中,我们读取Spark根目录下的README文件来创建一个DataFrame。
>>> textFile = spark.read.text("README.md")
你可以从DataFrame直接获得数据,通过调用actions或者transform来获取一个新的DataFrame。更多细节,请参阅API doc。
>>> textFile.count() # Number of rows in this DataFrame
126
>>> textFile.first() # First row in this DataFrame
Row(value=u'# Apache Spark')
现在,我们通过transform函数来获取一个新的DataFrame,我们使用filter来过滤一些行。
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
也可以把transforms和actions操作合并成一个链式调用。
>>> textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"?
15
使用Dataset的action和transform来完成复杂计算,比如,找出整篇文档出现最多单词的一行的单词个数。
首先找出每一行有多少个单词,这是一个整数,我们把它取名为numWords,textFile.select返回一个DataFrame实例。接着调用DataFrame的agg方法,找出最大numWords行。select和agg的参数都是Column,我们可以用df.colName获取一列(df是一个DataFrame实例)。我们导入了pyspark.sql.functions,它提供了很多便捷函数,帮助我们更好的构建一个新的或者老的Column。
一种常见数据流模式是MapReduce,由Hadoop推广。Spark也可以很容易地实现MapReduce流:
这里,我们在select语句中使用explode函数,把一个行Dataset转化成词Dataset,然后将groupBy和count结合起来,将文件中的每个单词计数计算为一个包含两列的DataFrame:"word"和"count"。使用collect方法收集所有单词计数数据。
Spark还支持把数据集拉入集群范围内的内存缓存中。当一份数据被重复访问,比如查询一个小的热点数据集或者跑一个迭代算法(如PageRank),缓存将会非常有用。一个简单的例子,缓存linesWithSparkdataset。
使用Spark浏览和缓存一个100行文本看起来似乎很愚蠢。有趣的是,这些类似的函数可以用于非常大的数据集,即使它们跨了数十个或数百个节点。你也可以在shell里连接一个集群来做这些,更多请参阅RDD编程指南。