1、maven依赖。

2、测试代码。
package spark;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
public class DBApp {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("MySQLSpark");
JavaSparkContext sparkContext = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sparkContext);
// 写入mysql数据
writeMySQL(sparkContext, sqlContext);
// 读取mysql数据
//readMySQL(sqlContext);
// 停止SparkContext
sparkContext.stop();
}
/**
* 读取数据
* @param sqlContext
*/
private static void readMySQL(SQLContext sqlContext) {
String url = "jdbc:mysql://localhost:3306/world";
// 查找的表名
String table = "user";
// 数据库连接
Properties connProperties = new Properties();
connProperties.put("user", "root");
connProperties.put("password", "123456");
connProperties.put("driver", "com.mysql.cj.jdbc.Driver");
// SparkJdbc读取MySQL的user表内容
System.out.println("读取world数据库中的user表内容");
// 读取表中所有数据
Dataset<Row> dsUser = sqlContext.read().jdbc(url, table, connProperties).select("*");
// 显示数据
dsUser.show();
}
/**
* 写入数据
* @param sparkContext
* @param sqlContext
*/
private static void writeMySQL(JavaSparkContext sparkContext, SQLContext sqlContext) {
String url = "jdbc:mysql://localhost:3306/world";
// 查找的表名
String table = "user";
// 数据库连接
Properties connProperties = new Properties();
connProperties.put("user", "root");
connProperties.put("password", "123456");
connProperties.put("driver", "com.mysql.cj.jdbc.Driver");
// 写入的数据内容
JavaRDD<String> sourceRDD = sparkContext.parallelize(Arrays.asList("tom a", "jack b", "alex c"));
// 第一步:在RDD的基础上创建类型为Row的RDD。
// 将RDD变成以Row为类型的RDD。Row可以简单理解为Table的一行数据。
JavaRDD<Row> userRDD = sourceRDD.map(new Function<String, Row>() {
public Row call(String line) throws Exception {
String[] splited = line.split(" ");
return RowFactory.create(splited[0], splited[1]);
}
});
// 第二步:动态构造DataFrame的元数据。
List structFields = new ArrayList();
structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
structFields.add(DataTypes.createStructField("desc", DataTypes.StringType, true));
// 构建StructType,用于最后DataFrame元数据的描述。
StructType structType = DataTypes.createStructType(structFields);
// 第三步:基于已有的元数据以及RDD<Row>来构造DataFrame。
Dataset dsDateset = sqlContext.createDataFrame(userRDD, structType);
// 第四步:将数据写入到user表中。
dsDateset.write().mode("append").jdbc(url, table, connProperties);
}
}