mybatis源码分析:DataSource组件

Scroll Down

1、概述

在数据持久层中,数据源是一个非常重要的组件,其性能直接关系到整个数据持久层的性能。在实践中比较常见的第三方数据源组件有Apache Common DBCP、C3P0、Proxool等,Mybatis不仅可以集成第三方数据源组件,还提供了自己的数据源实现。

2、DataSource

常见的数据源组件都实现了javax.sql.DataSource接口,Mybatis自身实现的数据源实现也不例外。Mybatis提供了两个javax.sql.DataSource接口实现,分别是PooledDataSource和UnpooledDataSource。Mybatis使用不同的DataSourceFactory接口实现创建不同类型的DataSource。
image.png
在解析配置文件时会创建DataSource。
创建DataSource的代码如下:

private void environmentsElement(XNode context) throws Exception {
    if (context != null) {
      if (environment == null) {
        environment = context.getStringAttribute("default");
      }
      for (XNode child : context.getChildren()) {
        // 解析environment节点的id属性
        String id = child.getStringAttribute("id");
        if (isSpecifiedEnvironment(id)) {
          // 解析事务工厂类
          TransactionFactory txFactory = transactionManagerElement(child.evalNode("transactionManager"));
          // 解析DataSourceFactory
          DataSourceFactory dsFactory = dataSourceElement(child.evalNode("dataSource"));
          // 获取数据源
          DataSource dataSource = dsFactory.getDataSource();
          Environment.Builder environmentBuilder = new Environment.Builder(id)
              .transactionFactory(txFactory)
              .dataSource(dataSource);
          configuration.setEnvironment(environmentBuilder.build());
        }
      }
    }
}

数据库连接池在初始化时,一般会创建一定数量的数据库连接并添加到连接池中备用。当程序需要使用数据库连接时,从池中请求连接。当程序不再使用该连接时,会将其返回到池中缓存,等待下次使用,而不是直接关闭。当然,数据库连接池会控制连接总数的上限以及空闲连接数上限,如果连接池创建的总连接数已达到上限,且都已被占用,则后续请求连接的线程会进入阻塞队列等待,知道有线程释放出可用的连接。如果连接池中空闲连接数较多,达到其上限,则后续返回的空闲连接不会放入池中,而是直接关闭,这样可以减少系统维护多余数据库连接的开销。
如果将总连接数的上限设置的过大,可能因连接数过多导致数据库僵死,系统整体性能下降,如果总连接数上限过小,则无法完全发挥数据库的性能,浪费数据库资源,如果将空闲连接的上限设置的过大,则会浪费系统资源来维护这些空闲连接;如果空闲连接上限过小,当出现瞬间的峰值请求时,系统的快速响应能力就比较弱,所以在设置数据库连接池的这两个值时,需要进行性能测试。
PooledDataSource实现了简单的数据库连接的功能,PooledDataSource创建数据库连接的功能是依赖其中封装的UnpooledDataSource对象实现的。
image.png
PooledDataSource并不会直接管理java.sql.Connection对象,而是管理PooledConnection对象。在PooledConnection中封装了真正的数据库连接对象以及其代理对象,这里的代理是通过JDK动态代理产生的。PooledConnection继承了InvocationHandler接口。
PooledConnection类的核心字段如下:

private final int hashCode;
// 记录当前PooledConnection对象所在的PooledDataSource对象。
// 该PooledConnection是从该PooledDataSource中获取的;
// 当调用close()方法时会将该PooledConnection方法该PooledDataSource中
private final PooledDataSource dataSource;
// 真正的数据库连接
private final Connection realConnection;
// 数据库连接的代理对象
private final Connection proxyConnection;
// 从连接池中取出该连接的时间戳
private long checkoutTimestamp;
// 该连接创建的时间戳
private long createdTimestamp;
// 最后一次使用该连接的时间戳
private long lastUsedTimestamp;
private int connectionTypeCode;
private boolean valid;

invoke()方法如下:

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
      // 如果是close方法,则将连接放回到连接池中
      dataSource.pushConnection(this);
      return null;
    } else {
      try {
        if (!Object.class.equals(method.getDeclaringClass())) {
          // issue #579 toString() should never fail
          // throw an SQLException instead of a Runtime
          // 检查连接的可用性
          checkConnection();
        }
        // 调用Connection的对应的方法
        return method.invoke(realConnection, args);
      } catch (Throwable t) {
        throw ExceptionUtil.unwrapThrowable(t);
      }
    }
}

PoolState是用于管理PooledConnection对象状态的组件,它通过两个ArrayList集合分别管理空闲状态的连接和活跃状态的连接。
PooledDataSource中管理的真正的数据库连接对象是由PooledDataSource中封装的UnpooledDataSource对象创建的,并由PoolState管理所有的连接的状态。PooledDataSource中核心字段的含义和功能如下:

// 通过state管理连接池对象
private final PoolState state = new PoolState(this);
// 持有UnpooledDataSource对象,真正创建Connection对象
private final UnpooledDataSource dataSource;

// OPTIONAL CONFIGURATION FIELDS
// 最大活跃连接数
protected int poolMaximumActiveConnections = 10;
// 最大空闲连接数
protected int poolMaximumIdleConnections = 5;
// 最大checkout时长
protected int poolMaximumCheckoutTime = 20000;
// 在无法获取连接时,线程需要等待的时间
protected int poolTimeToWait = 20000;
protected int poolMaximumLocalBadConnectionTolerance = 3;
// 在检测一个数据库连接是否可用时,会给数据库发送一个测试SQL语句
protected String poolPingQuery = "NO PING QUERY SET";
// 是否允许发送测试SQL语句
protected boolean poolPingEnabled;
protected int poolPingConnectionsNotUsedFor;

private int expectedConnectionTypeCode; 

PooledDataSource实现了DataSource,getConnection()方法如下:

@Override
public Connection getConnection() throws SQLException {
    // 获取动态代理对象,这里加入动态代理实际上是为了实现close时,自动放回连接到连接池中。
    return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();
}

private PooledConnection popConnection(String username, String password) throws SQLException {
    boolean countedWait = false;
    PooledConnection conn = null;
    long t = System.currentTimeMillis();
    int localBadConnectionCount = 0;
    
    while (conn == null) {
      synchronized (state) {
        // 如果活跃连接不为空,则从活跃连接数组中获取连接
        if (!state.idleConnections.isEmpty()) {
          // Pool has available connection
          conn = state.idleConnections.remove(0);
          if (log.isDebugEnabled()) {
            log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
          }
        } else {
          // 如果池中还没有活跃连接,则使用UnpooledDataSource来创建
          // Pool does not have available connection
          if (state.activeConnections.size() < poolMaximumActiveConnections) {
            // Can create new connection
            conn = new PooledConnection(dataSource.getConnection(), this);
            if (log.isDebugEnabled()) {
              log.debug("Created connection " + conn.getRealHashCode() + ".");
            }
          } else {
            // Cannot create new connection
            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
            if (longestCheckoutTime > poolMaximumCheckoutTime) {
              // Can claim overdue connection
              state.claimedOverdueConnectionCount++;
              state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
              state.accumulatedCheckoutTime += longestCheckoutTime;
              state.activeConnections.remove(oldestActiveConnection);
              if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                try {
                  oldestActiveConnection.getRealConnection().rollback();
                } catch (SQLException e) {
                  /*
                     Just log a message for debug and continue to execute the following
                     statement like nothing happend.
                     Wrap the bad connection with a new PooledConnection, this will help
                     to not intterupt current executing thread and give current thread a
                     chance to join the next competion for another valid/good database
                     connection. At the end of this loop, bad {@link @conn} will be set as null.
                   */
                  log.debug("Bad connection. Could not roll back");
                }  
              }
              conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
              conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
              conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
              oldestActiveConnection.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
              }
            } else {
              // Must wait
              try {
                if (!countedWait) {
                  state.hadToWaitCount++;
                  countedWait = true;
                }
                if (log.isDebugEnabled()) {
                  log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                }
                long wt = System.currentTimeMillis();
                state.wait(poolTimeToWait);
                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
              } catch (InterruptedException e) {
                break;
              }
            }
          }
        }
        if (conn != null) {
          // ping to server and check the connection is valid or not
          if (conn.isValid()) {
            if (!conn.getRealConnection().getAutoCommit()) {
              conn.getRealConnection().rollback();
            }
            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
            conn.setCheckoutTimestamp(System.currentTimeMillis());
            conn.setLastUsedTimestamp(System.currentTimeMillis());
            state.activeConnections.add(conn);
            state.requestCount++;
            state.accumulatedRequestTime += System.currentTimeMillis() - t;
          } else {
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
            }
            state.badConnectionCount++;
            localBadConnectionCount++;
            conn = null;
            if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
              if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Could not get a good connection to the database.");
              }
              throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
            }
          }
        }
      }
    
    }
    
    if (conn == null) {
      if (log.isDebugEnabled()) {
        log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
      }
      throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
    }
    
    return conn;
}

逻辑图如下:
mybatisdatasourceflowchart.png
pushConnection()方法实现如下:

protected void pushConnection(PooledConnection conn) throws SQLException {

    synchronized (state) {
      // 从activeConnections中移除该PooledConnection对象
      state.activeConnections.remove(conn);
      if (conn.isValid()) { // 检测连接是否有效
        // 检测活跃连接数是否已经达到上限,以及该PooledConnection是否为该连接池的连接。
        if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
          // 累计checkout时长
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) { // 回滚未提交的事务
            conn.getRealConnection().rollback();
          }
          // 返还连接创建新的PooledConnection对象
          PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
          // 添加到idleConnection集合中
          state.idleConnections.add(newConn);
          newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
          newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
          // 将原来的连接设置为无效
          conn.invalidate();
          if (log.isDebugEnabled()) {
            log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
          }
          // 唤醒阻塞等待线程
          state.notifyAll();
        } else { // 空闲连接数已达到上限或PooledConnection对象并不属于该连接池
          // 累计checkout时长
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          // 关闭真正的数据库连接
          conn.getRealConnection().close();
          if (log.isDebugEnabled()) {
            log.debug("Closed connection " + conn.getRealHashCode() + ".");
          }
          //将PooledConnection对象设置为无效
          conn.invalidate();
        }
      } else {
        if (log.isDebugEnabled()) {
          log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
        }
        state.badConnectionCount++;
      }
    }
}