JDBC Importer

JDBC Importer Logo

JDBC Importer Tutorial 6 : One table, CSV Delimited, New Import Engine (primary key resolved)

Please make sure you have the appropriate libraries in your classpath (including the JDBC driver used to connect to your database) before starting the tutorials.

In this tutorial, you'll learn the basics of creating a new Import Engine and running the import with it. The table that will contain the rows imported is called employee and it has the following columns :

employee
NameType
idnumber(6)
firstnamevarchar(10)
lastnamevarchar(10)
jobdescriptionvarchar(10)
manageridnumber(6)
startdatedate
salarynumber(9,2)
departmentnumber(6)

Make sure that these table(s) are created in the database that you'll be importing data. You can find the oracle creation script in the samples directory under the filename : 'tutorial6/createtable_ora.sql'.

Now that the database is setup, you can examine the architecture to see how the Import Engine is used during the import.

Architecture Background

The Import Engine is used during the import for inserting/updating the database with the rows parsed from the DelimiterParser. It implements eight methods: setConnection, getConnection, setEntityDef, setBatchMode, init, importRow, executeBatch and cleanup. The setConnection and getConnection methods are used to set/get the JDBC connection used by the Import Engine. The setBatchMode is called to indicate whether batch should be used for importing rows. The setEntityDef method indicates that the following calls to importRow should update the database table defined in the given EntityDef. The init method is called after setEntityDef and before any calls to the importRow are made. It should initialize any resources that the Import Engine needs. The importRow method imports the array of columns into the database. If the import is using batch mode then the executeBatch mode is called after every n rows. After all rows are processed, the cleanup method is called so the Import Engine can free the resources that it initialized.

Custom Import Engine

The Import Engine that you will be creating will check if the row being imported exists based on the primary key of the table. If it does exist, then the row will be updated. Otherwise, the row will be inserted.

The first thing to do is create one class : UpdateEngine that implements the ImportEngine interface.

import java.sql.*;

import net.sourceforge.jdbcimporter.ColumnDef;
import net.sourceforge.jdbcimporter.ColumnValue;
import net.sourceforge.jdbcimporter.EntityDef;
import net.sourceforge.jdbcimporter.ImportEngine;
import net.sourceforge.jdbcimporter.MalformedDataException;

public class UpdateEngine implements ImportEngine 
{

  public void setConnection(Connection con) 
  {
    /* @todo implement this method */
  }

  public Connection getConnection() 
  {
    /* @todo implement this method */
    return null;
  }

  public void setEntityDef(ImportEntityDef entityDef) 
  {
    /* @todo implement this method */
  }

  public void init()
  {
    /* @todo implement this method */
  }
	
  public void cleanup()
  {
    /* @todo implement this method */
  }
	 
  public void importRow(ColumnValue[] values)
    throws SQLException, MalformedDataException 
  {
    /* @todo implement this method */
  }

  public void setBatchMode( boolean flag )
  {
    /* @todo implement this method */
  }
	
  public void executeBatch() throws BatchUpdateException
  {
    /* @todo implement this method */
  }
}
				
Initial Code

The implementation of setConnection, getConnection, setBatchMode and setEntityDef are straightforward enough. The setEntityDef method initializes an array of integers that maps the position of a column in the incoming array into is position in the update SQL statement.

...
public class UpdateEngine implements ImportEngine 
{
  Connection connection;
  ImportEntityDef  entityDef;
  int[] updateColumnMap;
  boolean batchFlag = false;

  public void setConnection(Connection con) 
  {
    connection = con;
  }

  public Connection getConnection() 
  {
    return connection;
  }

  public void setBatchMode( boolean flag )
  {
    batchFlag = flag;
  }

  public void setEntityDef(ImportEntityDef entityDef) 
  {
    this.entityDef = entityDef;
    this.updateColumnMap = new int[ this.entityDef.getColumns().length ];
    for ( int i = 0; i < updateColumnMap.length; i++ )
    {
      updateColumnMap[i] = i;
    }
  }
}
				
set/getConnection, setBatchMode and setEntityDef Implementations

The init and cleanup methods are more complicated since they need to initialize and cleanup three prepared statements: an insert prepared statement, a select prepared statement and an update prepared statement. The insert prepared statement is simple enough (the column names are put in the order defined in the EntityDef). For the select prepared statement, the import engine examines the meta data for the table and searches for the primary key columns. It then initializes an array to store the mapping between the position of the column in the select where clause and its position in the array defined in the EntityDef. For the update prepared statement, the import engine needs to modify the position of the columns being imported. All primary key columns are shifted to the end because the 'where' clause contains them and they will not be updated.

The implementation of the cleanup method is shorter since it just has to close the three prepared statements.

Also, a list will be used to keep track which rows were inserted and which rows were updated. This list will be used by the executeBatch method.

import java.util.List;
import java.util.ArrayList;
...

public class UpdateEngine implements ImportEngine 
{
  ...
  PreparedStatement insertStmt;
  PreparedStatement updateStmt;
  PreparedStatement selectStmt;
	
  int[] uniqueKeys;
  List    stmtTypes = new ArrayList();

  ...	
  public void init() throws SQLException
  {
    // first the insert statement
    StringBuffer sql = new StringBuffer("INSERT INTO ");
    sql.append( entityDef.getTable() );
    sql.append( " ( " );
    ImportColumnDef[] columns = entityDef.getColumns();
    StringBuffer sqlvalues = new StringBuffer(" ( " );
    for ( int i = 0; i < columns.length; i++ )
    {
      if ( i > 0 )
      {
        sql.append( ", " );  
        sqlvalues.append( ", " );
      }
      ColumnDef nextColumn = columns[i];
      sql.append( nextColumn.getName() );
      sqlvalues.append( "?" );
    }
    sql.append( " ) VALUES " );
    sqlvalues.append( " ) " );
  	sql.append( sqlvalues.toString() );
    System.out.println("Insert Stmt : "+sql.toString() );
    insertStmt = connection.prepareStatement(sql.toString());

    String catalog = entityDef.getCatalog();
    String schema  = entityDef.getSchema();
    String table   = entityDef.getTable();
    
    
    List primaryKeyNames = getPrimaryKeys( catalog, schema, table );
    if ( primaryKeyNames.size() == 0 )
    {
      if ( catalog != null ) catalog = catalog.toUpperCase();
      if ( schema  != null ) schema  = schema.toUpperCase();
      if ( table   != null ) table   = table.toUpperCase();
      primaryKeyNames = getPrimaryKeys( catalog, schema, table );
    }
    if ( primaryKeyNames.size() == 0 )
    {
      if ( catalog != null ) catalog = catalog.toLowerCase();
      if ( schema  != null ) schema  = schema.toLowerCase();
      if ( table   != null ) table   = table.toLowerCase();
      primaryKeyNames = getPrimaryKeys( catalog, schema, table );
    }
    
    if ( primaryKeyNames.size() == 0 )
    {
      System.out.println("No Primary Keys for Entity : '"+entityDef.getTable()+"'" );
      // no primary keys
      return;
    }
    else
    {
      System.out.println("Primary Keys for Entity : "+primaryKeyNames );
    }

    uniqueKeys = new int[ primaryKeyNames.size() ];
    for ( int i = 0; i < uniqueKeys.length; i++ )
    {
      uniqueKeys[i] = -1;
    }
    
    int lastWhereIndex = columns.length - primaryKeyNames.size();
    int lastSetIndex   = 0;
    for ( int i = 0; i < columns.length; i++ )
    {
      ColumnDef columnDef = columns[i];
      boolean found = false;
      for ( int j = 0; j < primaryKeyNames.size(); j++ )
      {
        String nextKey = (String) primaryKeyNames.get(j);
        if ( columnDef.getName().equalsIgnoreCase( nextKey ) )
        {
          uniqueKeys[j] = i;
          found = true;
          break;
        }
      }
      if ( !found )
      {
        updateColumnMap[lastSetIndex] = i;
        lastSetIndex++;
      }
      else
      {
        updateColumnMap[lastWhereIndex] = i;
        lastWhereIndex++;
      }
    }

    for ( int i = 0; i < uniqueKeys.length; i++ )
    {
      // could not find primary key in one of the import columns
      if ( uniqueKeys[i] == -1) return;
    }
        
    // third, the select statement
    sql = new StringBuffer("SELECT ");
    StringBuffer sqlwhere = new StringBuffer("");    
    for ( int i = 0; i < primaryKeyNames.size(); i++ )        
    {
      sql.append( (String) primaryKeyNames.get(i) );
      if ( i < primaryKeyNames.size() - 1 )
      {
        sql.append( "," );
      }
      sqlwhere.append( (String) primaryKeyNames.get(i) );
      sqlwhere.append( " = ? " );
      if ( i < primaryKeyNames.size() - 1 )
      {
        sqlwhere.append( " AND " );    
      }  
    }
    sql.append( " FROM " );
    sql.append( entityDef.getTable() );
    sql.append( " WHERE ");
    sql.append( sqlwhere.toString() );
    System.out.println("Select Stmt : "+sql.toString() );
    selectStmt = connection.prepareStatement( sql.toString() );
        
    // fourth, the update statement
    sql = new StringBuffer("UPDATE ");
    sql.append( entityDef.getTable() );
    sql.append( " SET " );
    sqlwhere = new StringBuffer("" );
    for ( int i = 0; i < columns.length; i++ )
    {
      
      ColumnDef columnDef = columns[updateColumnMap[i]];
      if ( i < columns.length - uniqueKeys.length ) 
      {
        sql.append( columnDef.getName() );
        sql.append( " = ? ") ;
        if ( i < columns.length - uniqueKeys.length - 1 )
        {
          sql.append( ", " );
        }    
      }
      else
      {
        sqlwhere.append( columnDef.getName() );
        sqlwhere.append( " = ? " );
        if ( i < columns.length - 1 )
        {
          sqlwhere.append( " AND " );
        }
      }
    }
    sql.append( " WHERE " );
    sql.append( sqlwhere.toString() );
    System.out.println("Update Stmt : "+sql.toString() );
    updateStmt = connection.prepareStatement( sql.toString() );
  }
  ....
  
  protected List getPrimaryKeys( String catalog, String schema, String table ) throws SQLException
  {
    DatabaseMetaData dbMeta = connection.getMetaData();
    ResultSet rset = dbMeta.getPrimaryKeys(
      catalog,
      schema,
      table
    );
    
    List primaryKeyNames = new ArrayList();
    while ( rset.next() )
    {
      primaryKeyNames.add( rset.getString("COLUMN_NAME") );
    }
    rset.close();
    return primaryKeyNames;
  }
  
}
				
init Implementation

The importRow method is more complicated because it must perform two tasks : check whether the row is in the database, insert/update the row. To check if the row exists, the import engine uses the select prepared statement with the appropriate column values. If the row exists in the database then the import engine uses the update prepared statement, otherwise the insert prepared statement is used.

The importRow method also uses the ImportEngineHelper class. The JDBCParameterHelper class is a utility class that exposes a method to set a column value in a prepared statement.

...

public class UpdateEngine implements ImportEngine 
{
  ...
 JDBCParameterHelper helper = new JDBCParameterHelper();
  ...
  public void importRow(ColumnValue[] values)
    throws SQLException, MalformedDataException 
  {
    boolean insertFlag    = true;
    ImportColumnDef[] columns = entityDef.getColumns();
    if ( selectStmt != null && updateStmt != null )
    {
      for ( int i = 0; i < uniqueKeys.length; i++ )
      {
        ColumnDef columnDef = columns[ uniqueKeys[i] ];
        ColumnValue columnValue = values[ uniqueKeys[i] ];
        helper.setColumn( selectStmt, i+1,  columnDef, columnValue );
      }    
      
      ResultSet rset = selectStmt.executeQuery();
      insertFlag = !rset.next();
      rset.close();
    }
    
    PreparedStatement stmt = null;
    if ( insertFlag )
    {
      stmt = insertStmt;
      if ( batchFlag ) stmtTypes.add("insert");
    }
    else
    {
      stmt = updateStmt;
      if ( batchFlag ) stmtTypes.add("update");
    }
          
    for ( int i = 0; i < values.length; i++ )
    {
      int column = i+1;
      int realIndex = i;
      if ( !insertFlag )
      {
        realIndex = updateColumnMap[i];
      }
      
      ColumnDef columnDef = columns[realIndex];
      ColumnValue columnVal = values[realIndex];
      helper.setColumn( stmt, column, columnDef, columnVal );    
    }
    
    try
    {
      if ( !batchFlag )
      {
        int rows = stmt.executeUpdate();
        if ( rows != 1 ) 
        {
          throw new SQLException(rows+" rows inserted");
        }
      }
      else
      {
        stmt.addBatch();
      }
    }
    catch ( SQLException s )
    {
      throw s;
    }
  }
}
				
importRow Implementation

The executeBatch method will call executeBatch on both prepared statements and consolidate the array of integers returned by both calls. It uses the list to position each element of the two arrays into one big array of integers. If any of the integers represents an error condition then a BatchUpdateException will be thrown.

...

public class UpdateEngine implements ImportEngine 
{
  ...
  public void executeBatch() throws BatchUpdateException
  {
    int[] rowsUpdated = new int[ stmtTypes.size() ];
    for ( int i = 0; i < rowsUpdated.length; i++ )
    {
      rowsUpdated[i] = 0;
    }
    try
    {
      BatchUpdateException ex = null;
      int[] insertRows = null;
      try
      {
        insertRows = insertStmt.executeBatch();
      }
      catch ( BatchUpdateException bue )
      {
        insertRows = bue.getUpdateCounts();
      }
      catch ( SQLException sqle )
      {
        insertRows = new int[0];
      }
      int insertCounter = 0;
      for ( int i = 0; i < rowsUpdated.length; i++ )
      {
        if ( "insert".equals( stmtTypes.get(i) ) && insertCounter < insertRows.length )
        {
          rowsUpdated[i] = insertRows[insertCounter];
          insertCounter++;
        }
      }
      
      int[] updateRows = null;
      try
      {
        if ( updateStmt != null )
          updateRows = updateStmt.executeBatch();
        else
          updateRows = new int[0];
      }
      catch ( BatchUpdateException bue )
      {
        updateRows = bue.getUpdateCounts();      
      }
      catch ( SQLException sqle )
      {
        updateRows = new int[0];
      }

      int updateCounter = 0;
      for ( int i = 0; i < rowsUpdated.length; i++ )
      {
        if ( "update".equals( stmtTypes.get(i) )  && updateCounter < updateRows.length )
        {
          rowsUpdated[i] = updateRows[updateCounter];
          updateCounter++;
        }
      }
      
      // Validate that all stmts updated only one row
      for ( int i = 0; i < rowsUpdated.length; i++ )
      {
        if ( rowsUpdated[i] != 1 && rowsUpdated[i] != Statement.SUCCESS_NO_INFO )
        {
          throw new BatchUpdateException(rowsUpdated); 
        }
      }
    }
    finally
    {
      stmtTypes.clear();
    }
  }
  ...
}
				
executeBatch Implementation

This ends the tutorial for creating the custom ImportEngine. The full source code of the UpdateEngine is found under the package samples.importengine. What follows now is the instructions on how to use the custom ImportEngine during the import.

If you have read through the first tutorial then you may wish to skip to the Running the Import section. The other sections are the same as the first tutorial.

Import Config XML

Now that the database is setup, you can examine the import XML config file that will be used (in the samples directory under the filename : 'tutorial6/import.xml'). The file begins with the standard XML document declaration followed by the '<import>' tag. This tag indicates that there is an import to be processed. There are seven attributes specified on the '<import>' tag: the 'log' attribute, the 'bad' attribute, the 'commitCount', the 'batchCount' attribute, the 'preSQLFile' attribute, the 'postSQLFile' attribute and the 'trimValues' attribute. The 'log' attribute specifies a filename into which JDBCImporter writes all audit, error, and warnings that occur during the import. The 'bad' attribute specifies a filename into which JDBCImporter writes data that was not properly imported into the database. The 'commitCount' attribute specifies how many rows to import before calling commit on the JDBC Connection. The 'batchCount' attribute specifies how many rows to import before calling executeBatch on the import engine (when the JDBC driver supports batch mode). By default, the 'commitCount' and 'batchCount' attributes are set 1, auto commit is turned on and batch mode is not used. The 'preSQLFile' and the 'postSQLFile' attributes specify filenames that contain sql statements to be executed before and after the import , respectively. The 'trimValues' attribute specifies whether strings values read from the Delimiter Parser are trimmed (ie. remove leading and trailing whitespace). By default, it is set to false.

There are two parts inside the '<import>' tag that define how and where the data is imported: the connection definition and the entity definitions.

Connection Definition

The connection definition begins with '<connection>' tag and contains the information needed to connect to the database. In this tutorial, you will be using the JDBC DriverManager to initialize a connection to the database. To indicate this, the 'type' attribute's value, inside the '<connection>' tag, is 'jdbc'. The specific connection information is found inside the '<connection>' tag as '<property>' tags. A '<property>' tag has two attributes: 'name' specifies the name of the property and 'value' specifies the string value of the property. For the JDBC DriverManager, you will need to specify the following information: the driver class name (with the property name 'driver'), the connection url (with the property name 'url'), the username (with the property name 'username'), the password (with the property name 'password'). The following is an example of the connection definition :

 <connection type="jdbc"> 
    <property name="driver" value="oracle.jdbc.driver.OracleDriver"/> 
    <property name="url" value="jdbc:oracle:thin:@localhost:1521:orcl"/> 
    <property name="username" value="scott"/> 
    <property name="password" value="tiger"/> 
 </connection> 
Sample XML for Connection Definition

Entity Definition

Since you will be importing data into one table, there will be only one entity definition.In general, you will need an entity definition for each table that you will be importing data. Remember to specify the entity definitions in the order that the import should occur. For example, if table 'ingredient' depends on table 'recipe' (ie. has a foreign key), the entity definition of table 'recipe' should be placed before the entity definition of table 'ingredient'. Every entity definition begins with '<entity>' tag.

The 'table' attribute must contain the name of the table. Optionally, you can further specify the table by providing values for the 'schema' and the 'catalog' attributes.

To specify a custom import engine to process the entity, you may add the 'engine' attribute, whose value is the classname of the import engine. In this tutorial, you will be using a custom import engine defined at the import level.

The 'source' attribute must contain the data file location.From looking at the sample data (found under 'samples/tutorial5/employee.csv'), you will see that there are 8 columns that are separated by the ',' character.

There are three parts inside the '<entity>' tag : the delimiter parser definition, row translator definition, and the list of columns found in the data file.

Delimiter Parser

The delimiter parser definition begins with the '<delimiter>' tag and contains the information needed to parse the input file into a set of rows that will be imported into the table.In this tutorial, you will be using the CSV Delimiter Parser. To indicate this, the 'type' attribute's value, inside the '<delimiter>' tag, is 'csv'. The specific Delimiter Parser information is found inside the '<delimiter>' tag. For the CSV Delimiter Parser, you will need to specify the following information (as '<property>' tags): the string that delimits a column (in the property named 'columnDelimiter'), the string that encloses a column (optional, in the property named enclosedDelimiter'), whether the string that encloses a column is optional (in the property named 'enclosedOptional', it must have a value of 'true' or 'false'). Since, the data file has only a column delimiter (',' is the string separating the columns), the Delimiter Parser definition will look like this :

  <delimiter type="csv"> 
    <property name="columnDelimiter" value=","/> 
  </delimiter> 
Sample XML for CSV Delimiter Parser

Row Translator

The row translator definition is optional and begins with the '<translator>' tag. It contains the information needed to translate each row's values and may add, remove column values or skip the whole row. In this tutorial, you will not be using a row translator. Therefore the '<translator>' does not appear as a child inside the '<entity>' tag.

List of Columns

The final portion of the entity definition is the list of columns that are to be imported from the input file into the database. The list of columns should be the same order as they appear in the input file. Each column is defined inside the '<column>' tag. The name of the column must appear in the 'name' attribute of the '<column>' tag. Optionally, the java.sql.Type may be specified in the 'SQLType' attribute of the '<column>'.You will be letting the JDBC Importer figure out most of the column types (except for dates) in the database, so the 'SQLType' attribute is omitted except for the 'startdate' column. Here is an example of how the list of columns are defined in the import definition:

  <column name="id"></column>
  <column name="firstname"></column>
  <column name="lastname"></column>
  <column name="jobdescription"></column>
  <column name="manager"></column>
  <column name="startdate" SQLType="DATE"></column>;
Sample XML for List of Columns

Running the Import

By now, the import definition should look like this (with your appropriate connection information):

<import log="import.log bad="import.bad"> 
  <connection type="jdbc"> 
     <property name="driver" value="oracle.jdbc.driver.OracleDriver"/> 
     <property name="url" value="jdbc:oracle:thin:@localhost:1521:orcl"/> 
     <property name="username" value="scott"/> 
     <property name="password" value="tiger"/> 
  </connection> 
  <entity table="employee" source="employee.csv">
    <delimiter type="csv"> 
      <property name="columnDelimiter" value=","/> 
    </delimiter> 
    <column name="id"></column>
    <column name="firstname"></column>
    <column name="lastname"></column>
    <column name="jobdescription"></column>
    <column name="managerid"></column>
    <column name="startdate" SQLType="DATE"></column>
    <column name="salary"></column>
    <column name="department"></column>
  </entity> 
</import> 
Sample XML for Tutorial 6

Since there is a custom import engine, you will have to include it in the classpath before you run the import (the jar file 'jdbcimporter-samples.jar' under the directory 'lib' contains the custom import engine). You must also define a system property 'jdbcimporter.engine' that defines the class name of the custom import engine. You can run the import by issuing the following command (assuming that the import definition is in the current directory and is called 'import.xml'):

java -Djdbcimporter.engine=samples.importengine.UpdateEngine net.sourceforge.jdbcimporter.Importer import.xml

If all goes well then the two log files should be created. In the normal log file there should be an informational message indicating that all rows were imported. In the bad log file there should be a heading for the import table.

Generating an Error

Because all rows were imported successfully, there was no error messages. To generate an error message, change the employee.csv file to have an invalid record. For example, if you change line 48, so that the second column is 'Butterfield' instead of 'Preston', the import will fail for this line. After running the import, the normal log file should contain a message indicating that 74 out of 75 rows were imported and a linenumber/stack trace for the invalid row. The invalid line 48 should be written into the bad log file.