3/02/2020

Sqoop - Incremental Imports

In my last two blog posts I walked through how to use Sqoop to perform full imports.  Nightly full imports with overwrite has it's place for small tables like dimension tables.  However, in real-world scenarios you're also going to want a way to import only the delta values since the last time an import was run.  Sqoop offers two ways to perform incremental imports: append and lastmodified.

Both incremental imports can be run manually or created as job using the "sqoop job" command.  When running incremental imports manually from the command line the "--last-value" arg is used to specify the reference value for the check-column.  Alternately sqoop jobs track the "check-column" in the job and the value of the check-column is used for subsequent job runs as the where predicate in the SQL statement.  I.E. select columns from table where check-column > (last-max-check-column-value).

When using sqoop jobs, the following command args are helpful:

  • sqoop job --create
  • sqoop job -list
  • sqoop job -show [JobName]
  • sqoop job -exec [JobName]
  • sqoop job -delete [JobName]

Incremental Import - append


The "--incremental append" arg can be passed to the sqoop import command to run append only incremental imports.  At it's most simple this type of sqoop incremental import is meant to reference an ever increasing row id (like an Oracle sequence or a Microsoft SQL Server identity column).  Here's an example sqoop import command using incremental append that can be run manually:

sqoop import --connect 'jdbc:oracle:thin:@//[Hostname:Port]/[ServiceName]' --username '[Login]' -P -m 1 --table [TableName] --target-dir [HDFS Location for Hive External Table] --incremental append --check-column [col] --last-value [val]

The output includes some important pieces of information detailing what sqoop used for the query.  I.E. SELECT * from table where ID > [Lower bound value].  It also includes what the max ID was at the time of execution (the Upper bound value), and even what value to use for a subsequent run (--last-value 77) to continue appending where you left off:
    ...
    Lower bound value: 46
    Upper bound value: 77
    ...
    Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
     --incremental append
      --check-column ID
      --last-value 77
    (Consider saving this with 'sqoop job --create')

The very last line suggests creating a sqoop job instead of resorting to manual entry of the --last-value, so lets make one.  The command is very similar, however instead of sqoop import, we use sqoop job:

sqoop job --create [JobName] -- import --connect jdbc:oracle:thin:@//[Hostname:Port]/[ServiceName] --username [Login] -P -m 1 --table [TableName] --target-dir [HDFS Location for Hive External Table] --incremental append --check-column [col]

During the creation of this job you'll be prompted for the password to the RDBMS.

This command creates the job but does not run it.  To execute the new job use:
sqoop job -exec [JobName]

During the execution of this job you'll be prompted for the password to the RDBMS.

We can now inspect the sqoop job to learn a bit more about how things are wired up:

sqoop job -show [JobName]

There's a bunch of useful information in here like the following:

  • hdfs.target.dir - Where sqoop is putting the data in HDFS
  • mapreduce.num.mappers - The number of map reduce jobs (the -m arg)
  • hive.import - is this a hive import or an HDFS import
  • hdfs.delete-target.dir - True / False (for HDFS overwrites)
  • incremental.last.value - This is the upper bound from the previous run
  • incremental.col -  The check-column
  • hive.overwrite.table - True / False (for Hive overwrites)
  • incremental.mode = AppendRows or DateLastModified
  • db.table - source table name
  • db.connect.string - RDBMS connection string
A quick way to get the previous upper bound is to pipe the output to grep:
sqoop job -show [JobName] | grep incremental.last.value

It is important to note that I've been importing data via sqoop to HDFS, not Hive.  We can, however, modify the command slightly to achieve append only incremental imports to Hive managed tables:

sqoop import --connect 'jdbc:oracle:thin:@//[Hostname:Port]/[ServiceName]' --username '[Login]' -P -m 1 --table [TableName] --incremental append --check-column [col] --hive-import --hive-database [DBName] --hive-table [TableName] --last-value [val]

...or create a job to manage the last-value for us:

sqoop job --create [JobName] -- import --connect 'jdbc:oracle:thin:@//[Hostname:Port]/[ServiceName]' --username '[Login]' -P -m 1 --table [TableName] --incremental append --check-column [col] --hive-import --hive-database [DBName] --hive-table [TableName]

Incremental Import - lastmodified


The "--incremental lastmodified" arg can be passed to the sqoop import command to run lastmodified incremental imports.  What this means is that we're going to use some sort of datetime column for our comparison with the last sqoop execution and pull in any rows that have a more recent timestamp than the last execution.  This allows us to incorporate updates vs. append which only accommodated inserts from the source.

The sqoop import command for lastmodified is almost identical to the command used for append:

sqoop import --connect 'jdbc:oracle:thin:@//[Hostname:Port]/[ServiceName]' --username '[Login]' -P -m 1 --table [TableName] --target-dir [HDFS Location for Hive External Table] --incremental lastmodified --check-column [col] --last-value '[TIMESTAMP]'

The two modifications I made were to change append to lastmodified and I updated my --check-column to a "TIMESTAMP(6)" column in my Oracle table.  Sqoop expects the last-value to be in the format: 'YYYY-MM-DD HH24:MI:SS.FF'.

Again, the output includes some important pieces of information detailing what sqoop used for the query including the lower bound and upper bound values (only upper bound is list on the first execution).  The query run by sqoop ends up looking like SELECT * from table where [col] > [Lower bound value].  The Upper bound value gets set to the current time at the time of execution.
    ...
    Incremental import based on column [col]
    Lower bound value: TO_TIMESTAMP('[TIMESTAMP]')
    Upper bound value: TO_TIMESTAMP('[TIMESTAMP]')
    ...
    Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
    --incremental lastmodified
    --check-column [col]
    --last-value [YYYY-MM-DD HH24:MI:SS.FF]
    (Consider saving this with 'sqoop job --create')

You may have also noticed the following:

    Time zone has been set to GMT

In addition you may have noticed that the last-value doesn't exactly match.  What is actually listed as the last-value is the time at execution in GMT.  In order to address this we need to provide the sqoop import command with a Hadoop timezone property (sqoop import -D oracle.sessionTimeZone):

sqoop import -D oracle.sessionTimeZone=America/Denver --connect 'jdbc:oracle:thin:@//[Hostname:Port]/[ServiceName]' --username '[Login]' -P -m 1 --table [TableName] --target-dir [HDFS Location for Hive External Table] --incremental lastmodified --check-column [col] --last-value '[TIMESTAMP]'

In order to turn this into a sqoop job that auto manages the last-value, the command is very similar to what we did for the append job:

sqoop job -D oracle.sessionTimeZone=America/Denver --create [JobName] -- import --connect 'jdbc:oracle:thin:@//[Hostname:Port]/[ServiceName]' --username '[Login]' -P -m 1 --table [TableName] --target-dir [HDFS Location for Hive External Table] --incremental lastmodified --check-column [col]

We can then execute the job with the following command:
sqoop job -exec [JobName]

From here you can try updating the data in your RDBMS to have a date > the last Upper bound value. and re-running the job to watch the update flow through.  If you do this and make no changes to anything on the HDFS side, you'll receive the following on the second run:


  • ERROR tool.ImportTool: Import failed: --merge-key or --append is required when using --incremental lastmodified and the output directory exists.


This Error is telling us that sqoop with an incremental import to an HDFS directory that already exists requires an arg to either append the data (--append), or to merge the data (--merge-key).  You may also remember that we used --delete-target-dir in a previous blog to run daily imports with overwrite.  Unfortunately, --delete-target-dir can not be used with incremental imports.  So, from here you can do a few things: provide a --merge-key (a unique column from the source database to merge the data), specify --append (which will result in duplicate rows), or put the sqoop job in some sort of workflow which removes the target directory prior to executing the sqoop job.  If you're lucky enough to have a unique primary key and a valid lastmodified timestamp column, the easiest solution is to simply add the --merge-key:

sqoop import -D oracle.sessionTimeZone=America/Denver --connect 'jdbc:oracle:thin:@//[Hostname:Port]/[ServiceName]' --username '[Login]' -P -m 1 --table [TableName] --target-dir [HDFS Location for Hive External Table] --incremental lastmodified --check-column [col] --last-value '[TIMESTAMP]' --merge-key [col] --last-value '[TIMESTAMP]'

...or create a job:

sqoop job -D oracle.sessionTimeZone=America/Denver --create [JobName] -- import --connect 'jdbc:oracle:thin:@//[Hostname:Port]/[ServiceName]' --username '[Login]' -P -m 1 --table [TableName] --target-dir [HDFS Location for Hive External Table] --incremental lastmodified --check-column [col] --merge-key [col]

You may now be wondering if you can just take this all directly to a hive managed table.  If you try you'll receive the following:
--incremental lastmodified option for hive imports is not supported. Please remove the parameter --incremental lastmodified.

Getting everything to HDFS (or a hive external table) is pretty good though.  From here a simple Hive command can load this (with overwrite or with merge) into a Hive managed table.

With sqoop incremental imports you may, in some scenarios, end up with multiple/duplicate rows in HDFS or Hive.  You will have one row for the original import and a second for the newer updated data.  There are several methods to address the merge including writing SQL queries to Hive with a Max function on the datetime column, using a sqoop merge command to merge hdfs data, or using a combination of views and staging tables to name a few.

I think my favorite way to accomplish the merge is to have a Hive managed table and a hive external table.  The managed table contains the final merged data.  The external table is where the lastmodified data lands in hadoop.  Once the updates are loaded into the Hive external table a Hive merge command is then executed to update the data in the hive managed table and then the external table is essentially truncated via removal of the files in the related hdfs directory.  Suffice it to say this is all a topic for another discussion. 

No comments:

Post a Comment