Create a replay query

This guide shows you how to create a Replay Query, a Persistent Query (PQ) that replays historical table data as ticking intraday data. Among other things, this is useful for:

  • Testing changes to a script against known data.
  • Troubleshooting issues that a query had on a previous day’s data.
  • Stress testing a query by replaying a large data set at faster speed.
  • Simulating data ticking into a table.

The value of the Deephaven system clock, currentClock, is set to a simulated clock created using the query's replay settings. Calls to today, pastBusinessDate, etc. will return values based on the configured replay time, date, and speed. The timeTable and WindowCheck functions also use simulated clock.

Unless configured otherwise, Replay Queries redirect the db.live_table command to load the historical version of the table (db.historical_table) instead. They also apply a special where clause on the table's timestamp column that releases rows based on the configured clock. This allows you to take an existing query that uses intraday data and execute it on historical data instead.

To create a Replay query, click the +New button in the Query Monitor and select the type Live Query Replay (ReplayScript).

Live Query Replay (ReplayScript) selected in the Query Monitor.

Replay settings

Configure the query in the Replay Settings tab.

The Replay Settings tab.

The parameter options are detailed below.

Replay time

The Replay Time configuration provides two options that control the time of day that the script will begin.

  • Use query start time - The query executes using the current time of day.
  • Fixed start time - The query executes using this value as the current time of day. The format is HH:MM:SS.

This is useful if you need to replay data that was initially streamed in a different time zone than the one you are testing in.

Replay date

The Replay date option determines what date is returned by date-related methods in your query, such as:

  • today()
  • minusBusinessDays(today(), 1)

If you do not specify a replay date, the system uses the last business day from the default calendar by default.

You can set the replay date to simulate running your query as if it were a different day. This is useful for testing scripts against historical data or reproducing past scenarios.

Replay speed

The Replay Speed option controls the rate at which data is replayed. This is useful if you want to replay data at a higher speed to simulate a greater data load or process a day quicker. A replay speed of 0.5 would replay value at half speed, while a replay speed of 2 would replay data at double speed.

Sorted replay

The Sorted Replay checkbox guarantees that the data is replayed in Timestamp order - that is, in the order of the table, sorted by the configured timestamp column.

Caution

Enabling Sorted Replay on input data that is not sorted on a timestamp column leads to undefined results.

Additional JVM arguments

JVM arguments provide advanced configuration options for Replay Queries. These arguments can be added in the Extra JVM Arguments field when creating or editing a Replay Query.

Timestamp column

A Replay Query assumes that the replay timestamp column in all tables is Timestamp unless configured otherwise. If a table has only one column of type Instant, the Replay Query will automatically use it as the replay timestamp.

If your table has a timestamp column that is not named Timestamp, add the following argument to the Extra JVM Arguments field:

-DReplayDatabase.TimestampColumn.<Namespace>.<Table Name>=<Column Name>

For example:

img

You can also set a default timestamp column for all tables in a namespace:

-DReplayDatabase.TimestampColumn.<Namespace>=<Column Name>

Or set a global default for all tables:

-DReplayDatabase.TimestampColumn=<Column Name>

Replay intraday data

If you want to replay an intraday table instead of a historical table, add the following argument to the Extra JVM Arguments field:

-DReplayDatabase.UseIntraday=true

This is configurable on a per-namespace and per-table basis as well:

-DReplayDatabase.UseIntraday.<Namespace>=true
-DReplayDatabase.UseIntraday.<Namespace>.<TableName>=true

Replay buffer size

You can control how many rows are buffered during replay with:

-DReplayDatabase.BufferSize=<number of rows>

The default buffer size is 10,000 rows. Increasing this value can improve performance for high-throughput replays but will consume more memory.

Replay timeout

Set a timeout for replay operations:

-DReplayDatabase.TimeoutSeconds=<seconds>

The default timeout is 60 seconds. This is useful for preventing hung replays when working with very large datasets.

Debug logging

Enable detailed logging for troubleshooting replay issues:

-DReplayDatabase.Debug=true

This will output additional information about replay operations to the console logs, which can be helpful when diagnosing problems.

Script

Replay historical data

The simplest use case for a Replay Query is to replay a historical data set in an existing query script. If your script already uses the Deephaven built-in date methods (such as today() or now()), then you will not need to do any additional work to select the appropriate date partition for replay. If not, ensure it selects the correct playback date by hand.

For example, if you have any hardcoded dates:

my_table = db.live_table("Market", "Trade").where("Date=`2020-09-22`")

You will need to change the .where() clause manually. It is generally good practice to store the date in a local variable and reuse it throughout your query, so it is easy to change:

# Use lowercase with underscores for variable names (PEP 8)
current_date = "2020-09-22"
...
my_table = db.live_table("Market", "Trade").where("Date=`current_date`")

Save your query and your data will be replayed to the query script.

Simulate intraday data with replay

While it is often useful to simply replay historical data to a script, it may also be helpful to use a historical replay to simulate current intraday data. This example shows how to use the SystemTableLogger to re-log table data and transform timestamps to match today’s date and time. This allows you to run unmodified queries against simulated data with a little bit more complexity and configuration.

Say you want to simulate data from the following table:

<Table namespace="Market" name="Trade" storageType="NestedPartitionedOnDisk">
    <Partitions keyFormula="${autobalance_single}" />
    <Column name="Date" dataType="String" columnType="Partitioning" />
    <Column name="Timestamp" dataType="DateTime" />
    <Column name="symbol" dataType="String" />
    <Column name="price" dataType="Double" />
    <Column name="size" dataType="Integer" />

    <Listener logFormat="1" listenerClass="TradeListener" listenerPackage="io.deephaven.example.gen">
        <Column name="Timestamp" timePrecision="Nanos"/>
        <Column name="symbol" />
        <Column name="price" />
        <Column name="size" />
    </Listener>
</Table>

You could replay data directly back to the table above; however, it is best practice to separate simulated or replayed data from actual production data.

  1. Create a Schema for Simulated Data

    • Define a CopyTable schema with a new namespace. This ensures replayed data does not mix with your original data.
    • Example schema definition:
      <CopyTable namespace="MarketSim" sourceNamespace="Market" name="Trade" sourceName="Trade" />
      
    • Deploy the schema using dhconfig schemas. This will add a new system table to Deephaven under the namespace MarketSim.
  2. Create and Configure the Replay Query

    • Write a script that:
      • Pulls the table from the original namespace.
      • Logs the table and its updates back to a binary log file.
import io.deephaven.enterprise.database.SystemTableLogger
import io.deephaven.time.DateTimeUtils

// Store the replay date in a variable for clarity and reuse
replayDate = DateTimeUtils.today()
srcNamespace = "Market"
destNamespace = "MarketSim"
tableName = "Trade"

// Retrieve the table to replay
sourceTable = db.liveTable(srcNamespace, tableName)
    .where("Date=`${replayDate}`")
    .dropColumns("Date")

options = SystemTableLogger.newOptionsBuilder()
       .currentDateColumnPartition(true)
       .build()

// When logging incrementally, a Closeable is returned. You must retain this object to ensure liveness. Call `close()` to stop logging and release resources.
logHandle = SystemTableLogger.logTableIncremental(db, destNamespace, tableName, sourceTable, options)

// At this point as rows tick into "Market.Trade" they will be automatically logged back to "MarketSim.Trade"

from deephaven_enterprise import system_table_logger as stl
from deephaven import time

# It is good practice to use a variable to store the value of the Date clauses for your tables

replay_date = time.dh_today()
src_namespace = "Market"
dest_namespace = "MarketSim"
table_name = "Trade"

# Here, we retrieve the table we are going to replay
source_table = (
    db.live_table(src_namespace, table_name)
    .where(f"Date=`{replay_date}`")
    .drop_columns("Date")
)

# When logging incrementally, a Closeable is returned. You must retain this object to ensure liveness
# Call `close()` to stop logging and release resources
log_handle = stl.log_table_incremental(
    namespace=dest_namespace,
    table_name=table_name,
    table=source_table,
    column_partition=None,  # If the API does not support this, revert to original
)

# At this point as rows tick into "Market.Trade" they are automatically logged back to "MarketSim.Trade"

Simulate intraday data with timestamp shifting

If your table contains any columns that are Instant columns, you will probably want to convert these into values with the current date, but the same original timestamp. This example replays the table to the MarketSim namespace and also converts every Instant column it finds to the current date at the same time:

import io.deephaven.enterprise.database.SystemTableLogger
import io.deephaven.time.DateTimeUtils

import java.time.Instant
import java.util.stream.Collectors

// It is good practice to use a variable to store the value of the Date clauses for your tables.
replayDate = DateTimeUtils.today()
replayDateParts = replayDate.split("-")

srcNamespace = "Market"
destNamespace = "MarketSim"
tableName = "Trade"

// Compute the nanos of Epoch at midnight for the current day
dtMidnightToday = LocalDate.now().atStartOfDay(ZoneId.of("America/New_York")).toInstant().toEpochMilli()

// Compute the nanos of Epoch at midnight of the date you are replaying
dtMidnightSource = LocalDate.of(Integer.parseInt(replayDateParts[0]),
        Integer.parseInt(replayDateParts[1]),
        Integer.parseInt(replayDateParts[2]))
        .atStartOfDay(ZoneId.of("America/New_York")).toInstant().toEpochMilli()

// Here we retrieve the table we are going to replay
sourceTable = db.liveTable(srcNamespace, tableName)
        .where("Date=`$REPLAY_DATE`")
        .dropColumns("Date")

// Create a closure that will convert an Instant value from the source date, to the current date, maintaining the timestamp
convertDateToday = { dt -> dt == null ? null : Instant.ofEpochMilli(dt.toEpochMilli() - dtMidnightSource + dtMidnightToday) }

// convert the Instant columns to today's date
convertDateColumns = { Table t ->
    final String[] cols = t.getDefinition().getColumns().stream().filter({c -> "Timestamp" != c.getName() && c.getDataType() == Instant.class })
            .map({ c -> "${c.getName()} = (Instant) convertDateToday.call(${c.getName()})".toString() }).toArray(String[]::new)
    return t.updateView(cols)
}

sourceTable = convertDateColumns(sourceTable)

opts = SystemTableLogger.newOptionsBuilder()
        .currentDateColumnPartition(true)
        .build()

// When logging incrementally, a Closeable is returned. You must retain this object to ensure liveness. Call `close()` to stop logging and release resources.
lh = SystemTableLogger.logTableIncremental(db, destNamespace, tableName, sourceTable, opts)

// At this point, as rows tick into "Market.Trade" they are automatically logged back to "MarketSim.Trade"
from deephaven_enterprise import system_table_logger as stl
from deephaven import time as dh_time
from deephaven.table import Table
from datetime import datetime, time

# It is good practice to use a variable to store the value of the Date clauses for your tables
replay_date = "2024-08-09"
replay_date_time = datetime.strptime(replay_date, "%Y-%m-%d")
# Get the current date
current_date = datetime.now().date()

src_namespace = "Market"
dest_namespace = "MarketSim"
table_name = "Trade"

# Compute the millis of Epoch at midnight for the current day
midnight_time = int(datetime.combine(current_date, time(0, 0)).timestamp() * 1000)

# Compute the millis of Epoch at midnight of the date you are replaying
midnight_source = int(datetime.combine(replay_date_time, time(0, 0)).timestamp() * 1000)

# Here we retrieve the table we are going to replay
sourceTable = db.live_table(src_namespace, table_name)
    .where("Date=`%s`" % replay_date)
    .drop_columns("Date")

# Efficiently shift a java.time.Instant from the source date to today, preserving the time of day
def convertDateToday(dt):
    if dt is None:
        return None
    # Use Java time operations for efficiency
    millis = dt.toEpochMilli()
    # Compute the offset from midnight on the source date
    offset = millis - midnight_source
    # Apply that offset to midnight today
    return Instant.ofEpochMilli(midnight_time + offset)

# Convert all Instant columns (except 'Timestamp')# Function to convert all Instant columns in a table
def convert_date_columns(table):
    # Get all columns in the table
    cols = table.getColumnNames()
    # Find all Instant columns
    instant_cols = [col for col in cols if table.getColumn(col).getDataType() == Instant.class]

    # For each Instant column, apply our conversion
    for col in instant_cols:
        table = table.update(col, f"convert_date_today({col})")

    return table

# Apply the conversion to our source table
source_table = convert_date_columns(source_table)

# When logging incrementally, a Closeable is returned. You must retain this object to ensure liveness
# Call `close()` to stop logging and release resources
log_handle = stl.log_table_incremental(
    namespace=dest_namespace,
    table_name=table_name,
    table=source_table,
    column_partition=None
)

# At this point as rows tick into Market.Trade, they are automatically logged back to MarketSim.Trade