博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark读取myslq优化--单机版
阅读量:5936 次
发布时间:2019-06-19

本文共 2871 字,大约阅读时间需要 9 分钟。

hot3.png

1.依赖环境:

org.scala-lang
scala-library
2.10.4
org.apache.spark
spark-core_2.10
2.2.0
org.apache.spark
spark-sql_2.10
2.2.0
mysql
mysql-connector-java
5.1.37

2.实现方式:

val conf = new SparkConf().setMaster("local[*]").setAppName("msyql数据读取")  val spark = SparkSession.builder().config(conf).getOrCreate()  val url = "jdbc:mysql://localhost:3306/hisms_sn?user=root&password=root"  val prop = new Properties()  val properties=Map("url"->"jdbc:mysql://192.168.0.135:3306/disease-qy?useUnicode=true&characterEncoding=UTF-8",    "driver"->"com.mysql.jdbc.Driver",    "user"->"root",    "password"->"root")  //读取mysql的5中方式  //1.不指定查询条件---并行度为1  def method1(): Unit ={    val df = spark.read.jdbc(url,"t_kc21k1",prop)    println(df.count())    println(df.rdd.partitions.size)    df.show(5)  }  //2.指定数据库字段的范围--并行度为5/**  * 方式二:指定数据库字段的范围  * 通过lowerBound和upperBound 指定分区的范围  * 通过columnName 指定分区的列(只支持整形)  * 通过numPartitions 指定分区数量 (不宜过大)  **/def method2(): Unit ={  val lowerBound = 1  val upperBound = 100000  val numPartitions = 5    val df = spark.read.jdbc(url,"t_kc21k1","id",lowerBound,upperBound,numPartitions,prop)    println(df.count())    println(df.rdd.partitions.size)    df.show(5)  }  //3.根据任意字段进行分区--并行度为2  def method3(): Unit ={    //通过predicates将数据根据akc194分为2个区    val predicates = Array[String]("akc194 <= '2016-06-30'", "akc194 <= '2017-01-01' and akc194>'2016-06-30'")    val df = spark.read.jdbc(url,"t_kc21k1",predicates,prop)    println(df.count())    println(df.rdd.partitions.size)    df.show(5)  }  //4.通过load获取---与method1一样 并行度为1  def method4(): Unit ={    val df = spark.read.format("jdbc").options(Map("url"->url,"dbtable"->"t_kc21k1")).option("fetchSize",1000).load()    println(df.count())    println(df.rdd.partitions.size)    df.show(5)  }  //5.加载条件查询后的数据  def method5(): Unit ={    //通过predicates将数据根据akc194分为2个区    val query="SELECT id,aac003,id_drg,name_drg from t_kc21k1 where id>50000"    //定要用左右括号包起来,因为dbtable的value会被当成一张table作查询,mysql connector会自动dbtable后面加上where 1=1    val df = spark.read.format("jdbc").options(Map("url"->url,"dbtable"->s"($query)kc21k1")).load()    println(df.count())    println(df.rdd.partitions.size)    df.show(5)  }

通过增加分区读取数据,只是增加了并行度,但如果对单机版的spark,还是不能减少内存的使用,spark读取数据库的规则就是该数据提取至内存,再做内存计算。

问题:

    windows上使用单机版spark,不依赖hive环境,读取mysql数据表很大的时候,做join操作,sparksql容易发生内存溢出,

1.目前只能通过减少数据的读取方式方式内存爆炸----比如:根据结果只选取需要的字段。

2.可以同配置使用hive环境,sparksql将会借助hive环境,而不依赖本地内存做计算,防止内存溢出。

转载于:https://my.oschina.net/shea1992/blog/3058600

你可能感兴趣的文章
『003』索引-脚本
查看>>
CH5102 Mobile Service
查看>>
C++当中的virtual继承
查看>>
手机H5显示一像素的细线
查看>>
Menu 菜单栏
查看>>
分页(thinkphp5.0版本)
查看>>
OCP新题,2019题库出现大量新题,062-第22题
查看>>
int *ptr=(int *)(&a+1)问题的探讨
查看>>
git - ssh key
查看>>
帮助-阅读随笔
查看>>
关闭键盘
查看>>
Quartus 12的TimeQuest Timing Analyzer
查看>>
JavaScript: 代码简洁之道
查看>>
Integer跟int的区别(备份回忆)
查看>>
集合解析
查看>>
sourcetree合并分支
查看>>
详解分布式应用程序协调服务Zookeeper
查看>>
LeetCode 208: Implement Trie (Prefix Tree)
查看>>
zoc license code
查看>>
【转】PreparedStatement的用法
查看>>