Importing Parquet files into the Database

This guide explains how to import externally generated Parquet files into the Deephaven historical database, making them available as standard tables.

You can use the ParquetTools library to read Parquet files. For more details, see Importing Data from Parquet.

There are two main approaches:

  • Create a Deephaven schema for the table (recommended for strict table structure).
  • Organize files into namespace and partition directories under /db/Systems for quick access without a schema.

Example directory structure

/db/Systems/namespace/tableName/partition/date=2024-06-01/yourfile.parquet

Approach 1: Manual Copy and Index

  1. Copy or link the appropriate Parquet file manually into place.
  2. Run the metadata indexer to update the database's index of available partitions.

Approach 2: Merge Process

  1. Use the merge process that automatically generates grouping data and updates the metadata index.

Create the Schema

Skip this step if you already have a schema for the table you want to import data into.

While you can create a schema manually, the easiest way to create a schema from an existing Parquet file in the Legacy engine is by using the following Groovy script in a Deephaven console. This will automatically read a Parquet file, generate an appropriate Deephaven schema, and add it to the centralized Deephaven configuration.

NAMESPACE    = "MyNamespace"
TABLE_NAME   = "MyTable"
FILE_PATH    = "/path/to/original/file.parquet"

/**
 * Storage type for the new table:
 *   2  – STORAGETYPE_NESTEDPARTITIONEDONDISK
 *   8  – STORAGETYPE_FRAGMENTEDONDISK
 *   16 – STORAGETYPE_HIERARCHICALONDISK
 */
STORAGE_TYPE = 2

/** If true, a new schema will be added, else existing schema will be updated **/
ADD_SCHEMA   = true

import com.illumon.iris.db.schema.xml.SchemaXmlParser
import com.illumon.iris.db.schema.xml.SchemaXmlUtil
import com.illumon.iris.db.v2.locations.parquet.ParquetTools
import com.illumon.dataobjects.ColumnDefinition
import com.illumon.iris.db.tables.TableDefinition
import com.illumon.iris.db.schema.xml.SchemaXmlFactory
import com.illumon.iris.db.schema.NamespaceSet
import com.illumon.iris.db.schema.SchemaServiceFactory
import com.illumon.iris.utils.SchemaCreatorUtils

schemaService = SchemaServiceFactory.getDefault()

// ----------------------------------------------------------------
// 1. Read the source Parquet table
// ----------------------------------------------------------------
sourceTable = ParquetTools.readTable(FILE_PATH)
if (sourceTable.hasColumns("Date")) {
    sourceTable = sourceTable.dropColumns("Date")
}

// ----------------------------------------------------------------
// 2. Build a new TableDefinition with an extra partition column
// ----------------------------------------------------------------
def cdef       = sourceTable.definition.columns
def newColumns = new ColumnDefinition[cdef.size() + 1]

// shift original columns right by one
for (int i = 0; i < cdef.size(); i++) {
    newColumns[i + 1] = cdef[i]
}
// insert Date string partition column at position 0
newColumns[0] = new ColumnDefinition<>("Date", String.class, 4).withPartitioning()

def newTDef = new TableDefinition(newColumns)
newTDef.setNamespace(NAMESPACE)
newTDef.setName(TABLE_NAME)
newTDef.setStorageType(STORAGE_TYPE)

// ----------------------------------------------------------------
// 3. Create schema XML for the new table
// ----------------------------------------------------------------
schema    = SchemaXmlFactory.getMutableSchema(newTDef, NamespaceSet.SYSTEM)
schemaXml = SchemaXmlFactory.getXmlSchema(newTDef, NamespaceSet.SYSTEM)

println "New schema:"
println SchemaXmlUtil.makeSchemaXmlOutputter().outputString(schemaXml.element)

// ----------------------------------------------------------------
// 4. Ensure namespace exists
// ----------------------------------------------------------------
if (!schemaService.containsNamespace(NAMESPACE)) {
    println "Adding namespace $NAMESPACE."
    schemaService.createNamespace(NamespaceSet.SYSTEM, NAMESPACE)
} else {
    println "Namespace $NAMESPACE exists."
}

// ----------------------------------------------------------------
// 5. Add or update the schema
// ----------------------------------------------------------------
if (ADD_SCHEMA) {
    println "Adding schema for $TABLE_NAME."
    schemaService.addSchema(schema)
} else {
    println "Updating schema for $TABLE_NAME."
    schemaService.updateSchema(schema)
}

Warning

In the code example above, there is an optional line of code, updateSchema(schema), that allows you to modify the schema automatically. If you use this option, be aware that if you change the type of an existing column, you may lose the ability to read data that has been previously written to disk.

Create the Database directories

Historical Deephaven tables live on disk (or an external store such as S3) at /db/Systems/<namespace>/Partitions/<storage partition ID>/<date>/<table name>/table.parquet. More details about the storage layer can be found here.

When adding a new table, you should create two sibling directories under your namespace:

  • Partitions/ – holds each internal partition (0, 1, 2, …).
  • WritablePartitions/ – contains symlinks to the internal partitions that the merge process will write to. The example below shows how to create these directories for the namespace MyNamespace and a single internal partition 0.
NAMESPACE="MyNamespace"
INTERNAL_PARTITION="0"
ROOT="/db/Systems"
PARTITION_PATH="${ROOT}/${NAMESPACE}/Partitions/${INTERNAL_PARTITION}"
WRITABLE_PATH="${ROOT}/${NAMESPACE}/WritablePartitions"

# 1-- Create the namespace-level directories
sudo -u dbmerge mkdir -vp "${ROOT}/${NAMESPACE}"/{Partitions,WritablePartitions}

# 2-- Create the storage partition (may be a symlink to S3, NFS, etc.)
sudo -u dbmerge mkdir -vp "${PARTITION_PATH}"

# 3-- Create symlinks for WritablePartitions
cd "${WRITABLE_PATH}"
sudo -u dbmerge ln -vs ../Partitions/"${INTERNAL_PARTITION}" "./${INTERNAL_PARTITION}"

For each date for which you have Parquet data, create two final directories corresponding to the date and the table name:

sudo -u dbmerge mkdir -vp "$PARTITION_PATH"/2023-04-28/MyTable

Then you can move/copy/link the original data into the new directory with the name table.parquet:

sudo -u dbmerge cp /path/to/original/file.parquet "$PARTITION_PATH"/2023-04-28/MyTable/table.parquet

Next, run the metadata indexer so that queries can discover the new location:

sudo /usr/illumon/latest/bin/iris metadata_indexer System MyNamespace MyTable

Now, you should be able to access your new table with:

myTable = db.historicalTable("MyNamespace", "MyTable")

Approach 2: Use the merge job

Merge jobs are usually used to automatically move data from the intraday database to the historical database at the end of the day. But they can also be run manually to import data to the historical database from external sources such as Parquet files.

For using the merge process, the schema of the table must have a keyFormula because this formula is used by the merge job to determine which partition a row of data belongs to.

Once the schema is set properly, you can read the Parquet file into memory and merge it into the historical table using the script mentioned here.