Spark 我们知道是可以连接数据库的,可以通过spark的API spark.read.jdbc 中可以读取oracle的数据。
但是很多时候我们读取数据库的时候不会全表读,需要加入查询条件,例如 创建时间。这个时候我们调用spark的jdbc的时候,需要指定查询的分区。
例如 查询 created_time 在 2019年1月~2019年2月之间 的数据,表是table1。
如果写
spark.read.jdbc(prop.getProperty("url"), "table1", prop)
.filter("created_time>=cast('xxxxx' as timestamp) and created_time<cast('xxx' as timestamp)")
那么,恭喜你,完了,如果table1 是一张大表,你这样查询 会直接让公司dba找你。
其次,你也无法有效利用spark的并发,这么做只会有一个task任务,这个任务就是执行 select * from ..... where ....
那么稍微明智点。每个task任务查询一天的数据。
spark.read.jdbc(prop.getProperty("url"), "table1", Array("created_time>=to_Date('xxxx','yyyymmdd') and created_time=to_Date('xxxx','yyyymmdd') and created_time<to_Date('xxxx','yyyymmdd')"......), coreProp)
这样,tasknum=3的话,起码每次会跑3个任务,也就是说 同时查3天的数据,稍微好点了。
但是 其实我们需要对数据库压力尽可能的小,如果table1 有created_time的单独索引还好,但是要是没有单独索引,而是有多个联合索引,那么万一选错了索引就会增大数据库的查询压力。
这个时候就需要用到hint了,数据库的hint可以尽可能的让本次查询按照我们自己指定的索引去查询。
可问题是 spark提供的API中 貌似没有可以加 hint的地方,那难道只能修改spark的源码了吗?
答案是
不。
其实,我们仔细看,spark 的jdbc中对 table 并没有强制的校验。可以把table变成一个查询,例如
table 变成 (select /*+index(A IDX_BILL_UPDATE ) */... from A where ....)t
这样就完成了 hint的引用了。
多想一下,就可以做到更多的事情了。
当然为了尽可能的利用CPU,还可以使用 ExecutorService 做 多线程。
在多线程中运行spark任务,这样运行的效率会更高,当然中间会有坑,例如 如何解决
办法肯定是有的,但是就不再这里一一赘述了。本次就 讲到spark中如何使用hint去查询数据。