《Phoenix使用总结》 二、Spark操作Phoenix

Spark 操作 Phoenix

由于之前的映射数据是存在 MongoDB 中,所以需要将 MongoDB 中的数据导入到 Phoenix 中。这里记录 Spark 对 Phoenix 的操作。

Maven 依赖

1
2
3
4
5
6
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>${phoenix.version}</version>
<scope>provided</scope>
</dependency>

从 Phoenix 读取数据

1
2
3
val rdd: RDD[Map[String, AnyRef]] = sparkContext.phoenixTableAsRDD(
"TABLE1", Seq("ID", "COL1"), zkUrl = Some("phoenix-server:2181")
)

写入 Phoenix

1
2
3
4
5
6
7
8
9
10
11
12
var matchIndexRDD = MongoSpark.load(sparkContext, ReadConfig(Map("uri" -> uri
, "database" -> "pubg"
, "collection" -> "match_index")))
.filter(m => (StringUtils.isNotBlank(m.getString("lowerNickName")) && m.getDate("startedAt") != null))
.map(m => (m.getString("lowerNickName"), m.getDate("startedAt"), m.getString("matchId")
, m.getString("mode"), m.getInteger("queueSize"), m.getInteger("totalRank"), m.getString("type")))

matchIndexRDD.saveToPhoenix(
"PUBG.MATCH_INDEX",
Seq("LOWERNICKNAME", "STARTEDAT", "MATCHID", "MODE", "QUEUESIZE", "TOTALRANK", "TYPE"),
zkUrl = Some("datanode03:2181")
)