测试代码如下:
package cn.xpleaf.bigdata.spark.scala.sql.p1
import java.util.Properties
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}
/**
* SparkSQL关于加载数据和数据落地的各种实战操作
*/
object _03SparkSQLLoadAndSaveOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkSQLOps.getClass.getSimpleName)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// readOps(sqlContext)
writeOps(sqlContext)
sc.stop()
}
/**
* 在write结果到目录中的时候需要留意相关异常
* org.apache.spark.sql.AnalysisException: path file:/D:/data/spark/sql/people-1.json already exists
* 如果还想使用该目录的话,就需要设置具体的保存模式SaveMode
* ErrorIfExist
* 默认的,目录存在,抛异常
* Append
* 追加
* Ingore
* 忽略,相当于不执行
* Overwrite
* 覆盖
*/
def writeOps(sqlContext:SQLContext): Unit = {
val df = sqlContext.read.json("D:/data/spark/sql/people.json")
df.registerTempTable("people")
val retDF = sqlContext.sql("select * from people where age > 20")
// retDF.show()
// 将结果落地
//retDF.coalesce(1).write.mode(SaveMode.Overwrite).json("D:/data/spark/sql/people-1.json")
// 落地到数据库
val url = "jdbc:mysql://localhost:3306/test"
val table = "people1" // 会重新创建一张新表
val properties = new Properties()
properties.put("user", "root")
properties.put("password", "root")
retDF.coalesce(1).write.jdbc(url, table, properties)
}
/*
// sparkSQL读数据
// java.lang.RuntimeException: file:/D:/data/spark/sql/people.json is not a Parquet file
sparkSQL使用read.load加载的默认文件格式为parquet(parquet.apache.org)
加载其它文件格式怎么办?
需要指定加载文件的格式.format("json")
*/
def readOps(sqlContext:SQLContext): Unit = {
// val df = sqlContext.read.load("D:/data/spark/sql/users.parquet")
// val df = sqlContext.read.format("json").load("D:/data/spark/sql/people.json")
// val df = sqlContext.read.json("D:/data/spark/sql/people.json")
val url = "jdbc:mysql://localhost:3306/test"
val table = "people"
val properties = new Properties()
properties.put("user", "root")
properties.put("password", "root")
val df = sqlContext.read.jdbc(url, table, properties)
df.show()
}
}
当执行读操作时,输出结果如下:
+---+----+---+------+
| id|name|age|height|
+---+----+---+------+
| 1| 小甜甜| 18| 168.0|
| 2| 小丹丹| 19| 167.0|
| 3| 大神| 25| 181.0|
| 4| 团长| 38| 158.0|
| 5| 记者| 22| 169.0|
+---+----+---+------+
当执行写操作时:
1.如果保存到json文件
注意有各种写模式,另外其保存的是一个目录,与HDFS兼容的目录格式
2.如果保存到jdbc
则会在数据库中创建一个DataFrame所包含列的表,注意该表不能存在
需要先启动Hive,然后再进行下面的操作。
代码编写
测试代码如下:
package cn.xpleaf.bigdata.spark.scala.sql.p2
import cn.xpleaf.bigdata.spark.scala.sql.p1._01SparkSQLOps
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
/**
* 通过创建HiveContext来操作Hive中表的数据
* 数据源:
* teacher_info.txt
* name(String) height(double)
* zhangsan,175
* lisi,180
* wangwu,175
* zhaoliu,195
* zhouqi,165
* weiba,185
*
* create table teacher_info(
* name string,
* height double
* ) row format delimited
* fields terminated by ',';
*
* teacher_basic.txt
* name(String) age(int) married(boolean) children(int)
* zhangsan,23,false,0
* lisi,24,false,0
* wangwu,25,false,0
* zhaoliu,26,true,1
* zhouqi,27,true,2
* weiba,28,true,3
*
* create table teacher_basic(
* name string,
* age int,
* married boolean,
* children int
* ) row format delimited
* fields terminated by ',';
* *
* 需求:
*1.通过sparkSQL在hive中创建对应表,将数据加载到对应表
*2.执行sparkSQL作业,计算teacher_info和teacher_basic的关联信息,将结果存放在一张表teacher中
*
* 在集群中执行hive操作的时候,需要以下配置:
* 1、将hive-site.xml拷贝到spark/conf目录下,将mysql connector拷贝到spark/lib目录下
2、在$SPARK_HOME/conf/spark-env.sh中添加一条记录
export SPARK_CLASSPATH=$SPARK_CLASSPATH:$SPARK_HOME/lib/mysql-connector-java-5.1.39.jar
*/
object _01HiveContextOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
// .setMaster("local[2]")
.setAppName(_01SparkSQLOps.getClass.getSimpleName)
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
//创建teacher_info表
hiveContext.sql("CREATE TABLE teacher_info(" +
"name string, " +
"height double) " +
"ROW FORMAT DELIMITED " +
"FIELDS TERMINATED BY ','")
hiveContext.sql("CREATE TABLE teacher_basic(" +
"name string, " +
"age int, " +
" married boolean, " +
"children int) " +
"ROW FORMAT DELIMITED " +
"FIELDS TERMINATED BY ','")
// 向表中加载数据
hiveContext.sql("LOAD DATA LOCAL INPATH '/home/uplooking/data/hive/sql/teacher_info.txt' INTO TABLE teacher_info")
hiveContext.sql("LOAD DATA LOCAL INPATH '/home/uplooking/data/hive/sql/teacher_basic.txt' INTO TABLE teacher_basic")
//第二步操作 计算两张表的关联数据
val joinDF = hiveContext.sql("SELECT " +
"b.name, " +
"b.age, " +
"if(b.married, '已婚', '未婚') as married, " +
"b.children, " +
"i.height " +
"FROM teacher_info i " +
"INNER JOIN teacher_basic b ON i.name = b.name")
joinDF.collect().foreach(println)
joinDF.write.saveAsTable("teacher")
sc.stop()
}
}
打包、上传与配置
打包后上传到集群环境中,然后针对Spark做如下配置:
在集群中执行hive操作的时候,需要以下配置:
1、将hive-site.xml拷贝到spark/conf目录下,将mysql connector拷贝到spark/lib目录下
2、在$SPARK_HOME/conf/spark-env.sh中添加一条记录
export SPARK_CLASSPATH=$SPARK_CLASSPATH:$SPARK_HOME/lib/mysql-connector-java-5.1.39.jar
提交spark作业
使用的spark提交作业的脚本如下:
[uplooking@uplooking01 spark]$ cat spark-submit-standalone.sh
#export HADOOP_CONF_DIR=/home/uplooking/app/hadoop/etc/hadoop
/home/uplooking/app/spark/bin/spark-submit \
--class $2 \
--master spark://uplooking02:7077 \
--executor-memory 1G \
--num-executors 1 \
$1 \
执行如下命令:
./spark-submit-standalone.sh spark-hive.jar cn.xpleaf.bigdata.spark.scala.sql.p2._01HiveContextOps
验证
可以在作业执行的输出结果有看到我们期望的输出,也可以直接在Hive中操作来进行验证:
hive> show tables;
OK
hpeople
people
t1
teacher
teacher_basic
teacher_info
Time taken: 0.03 seconds, Fetched: 6 row(s)
hive> select * from teacher;
OK
zhangsan 23 未婚 0 175.0
lisi 24 未婚 0 180.0
wangwu 25 未婚 0 175.0
zhaoliu 26 已婚 1 195.0
zhouqi 27 已婚 2 165.0
weiba 28 已婚 3 185.0
Time taken: 0.369 seconds, Fetched: 6 row(s)
需要确保ElasticSearch环境已经搭建好。
测试代码如下:
package cn.xpleaf.bigdata.spark.scala.sql.p2
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql._
import org.elasticsearch.spark._
/**
* Spark和ES的集成操作
* 引入Spark和es的maven依赖
* elasticsearch-hadoop
* 2.3.0
* 将account.json加载到es的索引库spark/account
* 可以参考官方文档:https://www.elastic.co/guide/en/elasticsearch/hadoop/2.3/spark.html
*/
object _02SparkElasticSearchOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_02SparkElasticSearchOps.getClass().getSimpleName)
.setMaster("local[2]")
/**
* Spark和es的集成配置
*/
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "uplooking01")
conf.set("es.port", "9200")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// write2ES(sqlContext)
readFromES(sc)
sc.stop()
}
/**
* 从es中读数据
* (使用sparkContext进行操作)
*/
def readFromES(sc:SparkContext): Unit = {
val resources = "spark/account" // 索引库/类型
val jsonRDD = sc.esJsonRDD(resources)
jsonRDD.foreach(println)
}
/**
* 向es中写入数据
* (使用sqlContext进行操作)
*/
def write2ES(sqlContext:SQLContext): Unit = {
val jsonDF = sqlContext.read.json("D:/data/spark/sql/account.json")
val resources = "spark/account" // 索引库/类型
jsonDF.saveToEs(resources)
}
}
使用Spark SQL中的内置函数对数据进行分析,Spark SQL API不同的是,DataFrame中的内置函数操作的结果是返回一个Column对象,而DataFrame天生就是"A distributed collection of data organized into named columns.",这就为数据的复杂分析建立了坚实的基础并提供了极大的方便性,例如说,我们在操作DataFrame的方法中可以随时调用内置函数进行业务需要的处理,这之于我们构建附件的业务逻辑而言是可以极大的减少不必须的时间消耗(基于上就是实际模型的映射),让我们聚焦在数据分析上,这对于提高工程师的生产力而言是非常有价值的Spark 1.5.x开始提供了大量的内置函数,还有max、mean、min、sum、avg、explode、size、sort_array、day、to_date、abs、acos、asin、atan
总体上而言内置函数包含了五大基本类型:
1、聚合函数,例如countDistinct、sumDistinct等;
2、集合函数,例如sort_array、explode等
3、日期、时间函数,例如hour、quarter、next_day
4、数学函数,例如asin、atan、sqrt、tan、round等;
5、开窗函数,例如rowNumber等
6、字符串函数,concat、format_number、rexexp_extract
7、其它函数,isNaN、sha、randn、callUDF
以下为Hive中的知识内容,但是显然Spark SQL也有同样的概念
UDF
用户自定义函数:User Definded Function
一路输入,一路输出
a--->A
strlen("adbad")=5
UDAF
用户自定义聚合函数:User Definded Aggregation Function
多路输入,一路输出
sum(a, b, c, d)---->汇总的结果
表函数
UDTF:用户自定义表函数:User Definded Table Function
多路输入,多路输出
"hello you"
"hello me" ---->转换操作,----->split("")---->Array[]
["hello, "you"]--->
"hello"
"you"
---->行列转换
一个基本的案例如下:
package cn.xpleaf.bigdata.spark.scala.sql.p2
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
/**
* SparkSQL 内置函数操作
*/
object _03SparkSQLFunctionOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_03SparkSQLFunctionOps.getClass().getSimpleName)
.setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val pdf = sqlContext.read.json("D:/data/spark/sql/people.json")
pdf.show()
pdf.registerTempTable("people")
// 统计人数
sqlContext.sql("select count(1) from people").show()
// 统计最小年龄
sqlContext.sql("select age, " +
"max(age) as max_age, " +
"min(age) as min_age, " +
"avg(age) as avg_age, " +
"count(age) as count " +
"from people group by age order by age desc").show()
sc.stop()
}
}
输出结果如下:
+---+------+-------+
|age|height| name|
+---+------+-------+
| 10| 168.8|Michael|
| 30| 168.8| Andy|
| 19| 169.8| Justin|
| 32| 188.8| Jack|
| 10| 158.8| John|
| 19| 179.8| Domu|
| 13| 179.8| 袁帅|
| 30| 175.8| 殷杰|
| 19| 179.9| 孙瑞|
+---+------+-------+
18/05/09 17:53:23 INFO FileInputFormat: Total input paths to process : 1
+---+
|_c0|
+---+
| 9|
+---+
18/05/09 17:53:24 INFO FileInputFormat: Total input paths to process : 1
+---+-------+-------+-------+-----+
|age|max_age|min_age|avg_age|count|
+---+-------+-------+-------+-----+
| 32| 32| 32| 32.0| 1|
| 30| 30| 30| 30.0| 2|
| 19| 19| 19| 19.0| 3|
| 13| 13| 13| 13.0| 1|
| 10| 10| 10| 10.0| 2|
+---+-------+-------+-------+-----+
1、Spark 1.5.x版本以后,在Spark SQL和DataFrame中引入了开窗函数,比如最经典的就是我们的row_number(),可以让我们实现分组取topn的逻辑。
2、做一个案例进行topn的取值(利用Spark的开窗函数),不知道同学们是否还有印象,我们之前在最早的时候,做过topn的计算,当时是非常麻烦的。但是现在用了Spark SQL之后,非常方便。
测试代码如下:
package cn.xpleaf.bigdata.spark.scala.sql.p2
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* SparkSQL 内置函数操作
*/
object _04SparkSQLFunctionOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_04SparkSQLFunctionOps.getClass().getSimpleName)
.setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
/**
* hive中的用户自定义函数UDF操作(即在SparkSQL中类比hive来进行操作,因为hive和SparkSQL都是交互式计算)
* 1.创建一个普通的函数
* 2.注册(在SqlContext中注册)
* 3.直接使用即可
*
* 案例:创建一个获取字符串长度的udf
*/
// 1.创建一个普通的函数
def strLen(str:String):Int = str.length
// 2.注册(在SqlContext中注册)
sqlContext.udf.register[Int, String]("myStrLen", strLen)
val list = List("Hello you", "Hello he", "Hello me")
// 将RDD转换为DataFrame
val rowRDD = sqlContext.sparkContext.parallelize(list).flatMap(_.split(" ")).map(word => {
Row(word)
})
val scheme = StructType(List(
StructField("word", DataTypes.StringType, false)
))
val df = sqlContext.createDataFrame(rowRDD, scheme)
df.registerTempTable("test")
// 3.直接使用即可
sqlContext.sql("select word, myStrLen(word) from test").show()
sc.stop()
}
}
输出结果如下:
+-----+---+
| word|_c1|
+-----+---+
|Hello| 5|
| you| 3|
|Hello| 5|
| he| 2|
|Hello| 5|
| me| 2|
+-----+---+
测试代码如下:
package cn.xpleaf.bigdata.spark.scala.sql.p2
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext}
/**
* 这两部分都比较重要:
* 1.使用SparkSQL完成单词统计操作
* 2.开窗函数使用
*/
object _05SparkSQLFunctionOps2 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_05SparkSQLFunctionOps2.getClass().getSimpleName)
.setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val list = List("Hello you", "Hello he", "Hello me")
// 将RDD转换为DataFrame
val rowRDD = sqlContext.sparkContext.parallelize(list).map(line => {
Row(line)
})
val scheme = StructType(List(
StructField("line", DataTypes.StringType, false)
))
val df = sqlContext.createDataFrame(rowRDD, scheme)
df.registerTempTable("test")
df.show()
// 执行wordcount
val sql = "select t.word, count(1) as count " +
"from " +
"(select " +
"explode(split(line, ' ')) as word " +
"from test) as t " +
"group by t.word order by count desc"
sqlContext.sql(sql).show()
sc.stop()
}
}
输出结果如下:
+---------+
| line|
+---------+
|Hello you|
| Hello he|
| Hello me|
+---------+
+-----+-----+
| word|count|
+-----+-----+
|Hello| 3|
| me| 1|
| he| 1|
| you| 1|
+-----+-----+
文章浏览阅读2w次,点赞7次,收藏51次。四个步骤1.创建C++ Win32项目动态库dll 2.在Win32项目动态库中添加 外部依赖项 lib头文件和lib库3.导出C接口4.c#调用c++动态库开始你的表演...①创建一个空白的解决方案,在解决方案中添加 Visual C++ , Win32 项目空白解决方案的创建:添加Visual C++ , Win32 项目这......_c#调用lib
文章浏览阅读4.6k次。苹方字体是苹果系统上的黑体,挺好看的。注重颜值的网站都会使用,例如知乎:font-family: -apple-system, BlinkMacSystemFont, Helvetica Neue, PingFang SC, Microsoft YaHei, Source Han Sans SC, Noto Sans CJK SC, W..._ubuntu pingfang
文章浏览阅读159次。表单表单概述表单标签表单域按钮控件demo表单标签表单标签基本语法结构<form action="处理数据程序的url地址“ method=”get|post“ name="表单名称”></form><!--action,当提交表单时,向何处发送表单中的数据,地址可以是相对地址也可以是绝对地址--><!--method将表单中的数据传送给服务器处理,get方式直接显示在url地址中,数据可以被缓存,且长度有限制;而post方式数据隐藏传输,_html表单的处理程序有那些
文章浏览阅读1.2k次。使用说明:开启Google的登陆二步验证(即Google Authenticator服务)后用户登陆时需要输入额外由手机客户端生成的一次性密码。实现Google Authenticator功能需要服务器端和客户端的支持。服务器端负责密钥的生成、验证一次性密码是否正确。客户端记录密钥后生成一次性密码。下载谷歌验证类库文件放到项目合适位置(我这边放在项目Vender下面)https://github.com/PHPGangsta/GoogleAuthenticatorPHP代码示例://引入谷_php otp 验证器
文章浏览阅读4.3k次,点赞5次,收藏11次。matplotlib.plot画图横坐标混乱及间隔处理_matplotlib更改横轴间距
文章浏览阅读2.2k次。①Storage driver 处理各镜像层及容器层的处理细节,实现了多层数据的堆叠,为用户 提供了多层数据合并后的统一视图②所有 Storage driver 都使用可堆叠图像层和写时复制(CoW)策略③docker info 命令可查看当系统上的 storage driver主要用于测试目的,不建议用于生成环境。_docker 保存容器
文章浏览阅读834次,点赞27次,收藏13次。网络拓扑结构是指计算机网络中各组件(如计算机、服务器、打印机、路由器、交换机等设备)及其连接线路在物理布局或逻辑构型上的排列形式。这种布局不仅描述了设备间的实际物理连接方式,也决定了数据在网络中流动的路径和方式。不同的网络拓扑结构影响着网络的性能、可靠性、可扩展性及管理维护的难易程度。_网络拓扑csdn
文章浏览阅读1.8k次,点赞5次,收藏8次。IOS系统Date的坑要创建一个指定时间的new Date对象时,通常的做法是:new Date("2020-09-21 11:11:00")这行代码在 PC 端和安卓端都是正常的,而在 iOS 端则会提示 Invalid Date 无效日期。在IOS年月日中间的横岗许换成斜杠,也就是new Date("2020/09/21 11:11:00")通常为了兼容IOS的这个坑,需要做一些额外的特殊处理,笔者在开发的时候经常会忘了兼容IOS系统。所以就想试着重写Date函数,一劳永逸,避免每次ne_date.prototype 将所有 ios
文章浏览阅读5.3k次。方法一:用PLSQL Developer工具。 1 在PLSQL Developer的sql window里输入select * from test for update; 2 按F8执行 3 打开锁, 再按一下加号. 鼠标点到第一列的列头,使全列成选中状态,然后粘贴,最后commit提交即可。(前提..._excel导入pl/sql
文章浏览阅读83次。Git常用命令速查手册1、初始化仓库git init2、将文件添加到仓库git add 文件名 # 将工作区的某个文件添加到暂存区 git add -u # 添加所有被tracked文件中被修改或删除的文件信息到暂存区,不处理untracked的文件git add -A # 添加所有被tracked文件中被修改或删除的文件信息到暂存区,包括untracked的文件...
文章浏览阅读202次。分享119个ASP.NET源码总有一个是你想要的_千博二手车源码v2023 build 1120
文章浏览阅读1.8k次。版权声明:转载请注明出处 http://blog.csdn.net/irean_lau。目录(?)[+]1、缺省构造函数。2、缺省拷贝构造函数。3、 缺省析构函数。4、缺省赋值运算符。5、缺省取址运算符。6、 缺省取址运算符 const。[cpp] view plain copy_空类默认产生哪些类成员函数