# spark-template **Repository Path**: dufafei/spark-template ## Basic Information - **Project Name**: spark-template - **Description**: spark开发模板 - **Primary Language**: Scala - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 17 - **Forks**: 5 - **Created**: 2020-05-01 - **Last Updated**: 2022-08-25 ## Categories & Tags **Categories**: distributed-service **Tags**: 模版 ## README # spark-template # 工作中基于spark开发的代码模板。 ## 使用spark-session ## ```scala val sparkConf = SparkConfBuilder().setAppName(name).setKryoSerializer().get() val sparkSession = SparkSessionBuilder().setConf(sparkConf).get() ``` ## 使用spark-streaming ## * 屏蔽获取StreamingContext和管理偏移量细节 * 用户只需编写相应的的业务代码 ```scala val sparkConf = SparkConfBuilder() .setAppName(name) .setKryoSerializer() .setSqlShufflePartition(1000) .setStreamingStopGracefullyOnShutdown(true) .setStreamingBackpressure(true) .setStreamingKafkaMaxRatePerPartition(10000) .get() val kafkaParam: Map[String, Object] = Map[String,Object]( "bootstrap.servers" -> "", "key.deserializer" -> "", "value.deserializer" -> "", "group.id"-> "", "enable.auto.commit" -> (false:java.lang.Boolean) ) SparkStreamingBuilder() .setSparkConf(sparkConf) .setKafkaParam(kafkaParam) .setTopics(Array("test")) .setDuration(Seconds(5)) .execute{ input => input.map(_.value()).foreachRDD{ rdd => if(!rdd.isEmpty()) { rdd.foreachPartition{ partition=> partition.foreach(println(_)) } } } } ``` ## 工具类封装 ## - 配置文件读取 - com.typesafe.conf ```scala def confPath: String = { val path = getClass.getProtectionDomain.getCodeSource.getLocation.getPath var url = URLDecoder.decode(path, "utf-8") if (url.endsWith(".jar")) { url = url.substring(0, url.lastIndexOf("/") + 1) } val file = new File(url) val parent = file.getParent parent + "/conf" } def load(name: String): Config = { val conf = new File(confPath + "/" + name) ConfigFactory.parseFile(conf) } ``` * jdbc连接池 - scalikejdbc ```scala def init( name: String, url: String, username: String, password: String, settings: ConnectionPoolSettings ): Unit = { if(!ConnectionPool.isInitialized(Symbol(name))) { ConnectionPool.add(Symbol(name), url, username, password, settings) info(s"successful initialization the database connection pool: $name") } } def init(name: String, dataSource: DataSource): Unit = { if(!ConnectionPool.isInitialized(Symbol(name))) { ConnectionPool.add(Symbol(name), new DataSourceConnectionPool(dataSource)) info(s"successful initialization the database connection pool: $name") } } def getConnection( driverClassName: String, url: String, username: String, password: String ): Connection = { Class.forName(driverClassName) DriverManager.getConnection(url, username, password) } def using[A](conn: Connection)(execute: DB => A): A = { val db = DB(conn) db.autoClose(false) using(db)(execute) } def using[A](name: String)(execute: DB => A): A = { val db = DB(ConnectionPool(Symbol(name)).borrow()) db.autoClose(false) using(db)(execute) } ``` - 更多详情见代码