spark streaming将处理结果存入mysql中,使用c3p0连接池

1、c3p0相应的架包导入工程中

将以下四个架包导入工程,

  主要有三个架包:c3p0-0.9.5.2.jar

c3p0-oracle-thin-extras-0.9.5.2.jar

mchange-commons-java-0.9.5.2.jar

  记得一定要导入mysql-connector-java-5.1.26-bin.jar架包,这是连接mysql的驱动程序。

2、创建进程池

  主要有三个类:

   C3p0Utils创建连接以及关闭连接;

   DBUtils插入更新等执行语句;

   DBUtil_BO数据库连接对像;

   App测试;

 1 import com.mchange.v2.c3p0.ComboPooledDataSource;
 2 
 3 import java.sql.Connection;
 4 import java.sql.PreparedStatement;
 5 import java.sql.ResultSet;
 6 import java.sql.SQLException;
 7 
 8 /**
 9  * Created by lala on 2017/6/29.
10  */
11 
12 public class C3p0Utils {
13 
14     static org.apache.log4j.Logger logger=org.apache.log4j.Logger.getLogger(C3p0Utils.class.getName());
15 
16     //通过标识名来创建相应连接池
17     static ComboPooledDataSource dataSource=new ComboPooledDataSource("mysql");
18     //从连接池中取用一个连接
19     public static Connection getConnection(){
20         try {
21              String url="jdbc:mysql://localhost:3306/mysql";
22             //定义连接数据的用户名
23             String user="root";
24             //定义连接数据库的密码
25              String password="";
26             return dataSource.getConnection();
27 
28         } catch (Exception e) {
29             logger.error("Exception in C3p0Utils!", e);
30            System.out.println("数据库连接出错!"+e);
31             return null;
32         }
33     }
34     //释放连接回连接池
35     public static void close(Connection conn, PreparedStatement pst, ResultSet rs){
36         if(rs!=null){
37             try {
38                 rs.close();
39             } catch (SQLException e) {
40                 logger.error("Exception in C3p0Utils!", e);
41                 System.out.println("数据库连接关闭出错!"+e);
42             }
43         }
44         if(pst!=null){
45             try {
46                 pst.close();
47             } catch (SQLException e) {
48                 logger.error("Exception in C3p0Utils!", e);
49                 System.out.println("数据库连接关闭出错!"+ e);
50             }
51         }
52 
53         if(conn!=null){
54             try {
55                 conn.close();
56             } catch (SQLException e) {
57                 logger.error("Exception in C3p0Utils!", e);
58                 System.out.println("数据库连接关闭出错!"+ e);
59             }
60         }
61     }
62 
63 }
DBUtils插入更新等执行语句:
 1 public class DBUtils {
 2 
 3     static org.apache.log4j.Logger logger=org.apache.log4j.Logger.getLogger(DBUtils.class.getName());
 4 
 5 
 6     private static void realseSource(Connection _conn, PreparedStatement _st, ResultSet _rs){
 7         C3p0Utils.close(_conn,_st,_rs);
 8     }
 9 
10     public static void realseSource(DBUtil_BO _vo){
11         if(_vo!=null){
12             realseSource(_vo.conn, _vo.st, _vo.rs);
13         }
14     }
15     //注意:查询操作完成后,因为还需提取结果集中信息,所以仍保持连接,在结果集使用完后才通过DBUtils.realseSource()手动释放连接
16     public static void executeQuery(DBUtil_BO vo)
17     {
18         try{
19             vo.rs = vo.st.executeQuery();
20         }catch (SQLException e){
21             realseSource(vo);
22 
23             logger.error("SQL语法有误: ",e);
24            System.out.println("err.user.dao.jdbc"+e);
25         }
26     }
27 
28     //而update操作完成后就可以直接释放连接了,所以在方法末尾直接调用了realseSourse()
29     public static  void executeUpdate(DBUtil_BO vo)
30     {
31 
32         Connection conn = vo.conn;
33         PreparedStatement st = vo.st;
34         try {
35             st.executeUpdate();
36         } catch (SQLException e) {
37             realseSource(conn, st, null);
38             logger.error(" SQL语法有误: ",e);
39             System.out.println("err.user.dao.jdbc"+e);
40         }
41         realseSource(conn, st,null );
42     }
43 }
DBUtil_BO数据库连接对像:
 1 package org.test.maven;
 2 
 3 import java.sql.Connection;
 4 import java.sql.PreparedStatement;
 5 import java.sql.ResultSet;
 6 
 7 /**
 8  * Created by lala on 2017/6/29.
 9  */
10 public class DBUtil_BO {
11 
12     public Connection conn = null;
13     public PreparedStatement st = null;
14     public ResultSet rs = null;
15     public DBUtil_BO() {
16         super();
17     }
18 
19 }

Obj方便插入的某一对象(scala)

 1 package org.test.maven
 2 
 3 /**
 4   * Created by lala on 2017/7/1.
 5   */
 6 class Obj {
 7   private var usr = -1
 8   private var name = ""
 9 
10   def setUsr(usr:Int)={
11     this.usr=usr
12   }
13 
14   def  setName(name:String)={
15     this.name=name
16   }
17   def getUsr()=this.usr
18   def getName()=this.name
19 }

App测试:

 1 package org.test.maven;
 2 
 3 import java.sql.SQLException;
 4 import org.apache.log4j.Logger;
 5 
 6 /**
 7  * Hello world!
 8  *
 9  */
10 public class App 
11 {
12 
13     private static Logger logger = Logger.getLogger(App.class);
14 
15     public static void main( String[] args ) throws SQLException {
16         System.out.println( "Hello World!" );
17         DBUtil_BO dbBo = new DBUtil_BO();
18         dbBo.conn=C3p0Utils.getConnection();//取用一个连接
19         String sql = "select id from usr where usr_name = ? ";
20         try{
21             dbBo.st=dbBo.conn.prepareStatement(sql);//预处理sql语句
22         }catch (SQLException e){
23             logger.error("查询预处理出错!",e);
24             System.out.println("查询预处理出错!"+e);
25         }
26         Obj ob=new Obj();
27         ob.setName("bgngh");
28         System.out.println(ob.getName());
29         dbBo.st.setString(1,ob.getName());
30         DBUtils.executeQuery(dbBo);
31         String name = null;
32         //从dbBo类提取操作结果
33         int id = 0;
34         if (dbBo.rs.next()) {
35             try{
36 
37                  id = dbBo.rs.getInt("id");
38                  id = 5;
39             }catch(SQLException e){
40                 logger.error("查询返回结果出错!",e);
41             }
42 
43         }
44         System.out.println(id);
45 //结果集遍历完了,手动释放连接回连接池
46         DBUtils.realseSource(dbBo);
47     }
48 }