Enterprise cheat sheet

This cheat sheet only deals with code concepts specific to Deephaven Enterprise. For the Deephaven Community cheat sheet, which includes the majority of the table API, see Ultimate Community Core Cheat Sheet.

Access data

You can query the Deephaven database for both static and live tables. The following code pulls from built-in namespaces, such as the LearnDeephaven namespace.

// Get a table containing the full catalog of namespaces and tables
allAvailable = db.catalogTable()

// Print all available namespaces
println db.namespaces()

// Historical tables of stock trades and EOD trades
staticSource1 = db.historicalTable("LearnDeephaven", "StockTrades")
staticSource2 = db.historicalTable("LearnDeephaven", "EODTrades")

// Access a real-time updating internal performance table
tickingSource = db.liveTable("DbInternal", "ProcessEventLog")

// Access a snapshot of a real-time updating table
snapshotEx = db.liveTable("DbInternal", "ProcessEventLog", false)
# Get a table containing the full catalog of namespaces and tables
all_available = db.catalog_table()

# Print all available namespaces
print(db.namespaces())

# Historical tables of stock trades and EOD trades
static_source_1 = db.historical_table("LearnDeephaven", "StockTrades")
static_source_2 = db.historical_table("LearnDeephaven", "EODTrades")

# Access the real-time updating internal performance table
ticking_source = db.live_table("DbInternal", "ProcessEventLog")

# Access a snapshot of a real-time updating table
snapshot_ex = db.live_table("DbInternal", "ProcessEventLog", False)
// Print all available namespaces
println db.namespaces()

// Print the tables in the LearnDeephaven namespace
println db.tableNames("LearnDeephaven")
# Print all available namespaces
print(db.namespaces())
# Print the tables in the LearnDeephaven namespace
print(db.table_names("LearnDeephaven"))

# Describes the packages available in the main deephaven module
help("deephaven")

Filter on partitioning values

Filtering first by partitioning values (most often the set of Dates) is both a best practice and necessary for some downstream use cases.

tzNyc = ZoneId.of("America/New_York")
tzSyd = ZoneId.of("Australia/Sydney")

// Date is the most common partitioning value
todaysData1 = tickingSource.where("Date = today()")  // Filter for today's data
todaysData2 = tickingSource.where("Date = today(tzNyc)")  // Filter for today's data in New York's time zone
todaysData3 = tickingSource.where("Date = today(tzSyd)")  // Filter for today's data in Sydney's time zone

// This is a best practice, and necessary for some downstream use cases
singleStringDate = staticSource1.where("Date = `2017-08-23`")  // HEAVILY USED!
lessThanDate = staticSource1.where("Date < `2017-08-23`")  // use >=, etc. as well
oneMonthString = staticSource1.where("Date.startsWith(`2017-08`)")
from deephaven.time import to_j_time_zone

tz_nyc = to_j_time_zone("America/New_York")
tz_syd = to_j_time_zone("Australia/Sydney")

# Date is the most common partitioning value
todays_data_1 = ticking_source.where("Date = today()")  # Filter for today's data
todays_data_2 = ticking_source.where(
    "Date = today(tz_nyc)"
)  # Filter for today's data in New York's time zone
todays_data_3 = ticking_source.where(
    "Date = today(tz_syd)"
)  # Filter for today's data in Sydney's time zone

# This is a best practice, and necessary for some downstream use cases
single_string_date = static_source_1.where("Date = `2017-08-23`")  # HEAVILY USED!
less_than_date = static_source_1.where("Date < `2017-08-23`")  # use >=, etc. as well
one_month_string = static_source_1.where("Date.startsWith(`2017-08`)")

Log a table to a system table

You can log to a system table once:

import io.deephaven.enterprise.database.SystemTableLogger
import io.deephaven.enterprise.codec.DoubleArrayCodec
import io.deephaven.enterprise.codec.StringArrayCodec

myTimeTable = timeTable("PT2s").update(
    "IntCol = (int) i",
    "Doubles = new double[] {i % 10 == 0 ? null : i*1.1}",
    "Strings = new String[] {i % 10 == 0 ? null : (`` + (i % 101))}"
)

opts = SystemTableLogger.newOptionsBuilder()
            .currentDateColumnPartition(true)
            .putColumnCodecs("Doubles", new DoubleArrayCodec())
            .putColumnCodecs("Strings", new StringArrayCodec())
            .build()

// Log `myTimeTable` to the system table `Example.Data` once
SystemTableLogger.logTable(db, "Example", "Data", myTimeTable, opts)
from deephaven_enterprise import system_table_logger as stl
from deephaven import time_table

my_time_table = time_table("PT2s").update(
    [
        "IntCol = (int) i",
        "Doubles = new double[] {i % 10 == 0 ? null : i*1.1}",
        "Strings = new String[] {i % 10 == 0 ? null : (`` + (i % 101))}",
    ]
)

# Log `my_time_table` to the system table `Example.Data` once
stl.log_table(
    namespace="Example",
    table_name="Data",
    table=my_time_table,
    columnPartition=None,  # Use current date as partition
    codecs={
        "Doubles": stl.double_array_codec(),
        "Strings": stl.string_array_codec(),
    },
)

Or you can log to a system table incrementally:

import io.deephaven.enterprise.database.SystemTableLogger
import io.deephaven.enterprise.codec.DoubleArrayCodec
import io.deephaven.enterprise.codec.StringArrayCodec

myTimeTable = timeTable("PT2s").update(
    "IntCol = (int) i",
    "Doubles = new double[] {i % 10 == 0 ? null : i*1.1}",
    "Strings = new String[] {i % 10 == 0 ? null : (`` + (i % 101))}"
)

opts = SystemTableLogger.newOptionsBuilder()
            .currentDateColumnPartition(true)
            .putColumnCodecs("Doubles", new DoubleArrayCodec())
            .putColumnCodecs("Strings", new StringArrayCodec())
            .build()

// Log `myTimeTable` to system table `Example.Data` incrementally. When finished logging, call `lh.close()`
lh=SystemTableLogger.logTableIncremental(db, "Example", "Data", myTimeTable, opts)

// After some time passes...
lh.close()
from deephaven_enterprise import system_table_logger as stl
from deephaven import time_table

my_time_table = time_table("PT2s").update(
    [
        "IntCol = (int) i",
        "Doubles = new double[] {i % 10 == 0 ? null : i*1.1}",
        "Strings = new String[] {i % 10 == 0 ? null : (`` + (i % 101))}",
    ]
)

# Log `Example.Data` to a system table incrementally. When finished logging, call `lh.close()`
lh = stl.log_table_incremental(
    namespace="Example",
    table_name="Data",
    table=table_to_log,
    columnPartition=None,  # Use current date as partition
    codecs={
        "Doubles": stl.double_array_codec(),
        "Strings": stl.string_array_codec(),
    },
)

# After some time passes...
lh.close()

Share tables between workers

You can share tables between workers in the same cluster or different clusters via the remote table API:

import io.deephaven.enterprise.remote.RemoteTableBuilder
import io.deephaven.enterprise.remote.SubscriptionOptions

// Create a remote table builder for the same cluster as where this code is running
builder = RemoteTableBuilder.forLocalCluster()

// Create a remote table builder for a different Deephaven cluster, by providing the cluster's URL
// The default port is 8000 for Deephaven installations with Envoy, and 8123 for installations without Envoy
builder = RemoteTableBuilder.forRemoteCluster("https://dh-cluster:8000")

// Subscribe to `SomeTable` in the remote cluster (same syntax for local and remote clusters)
remoteTable = builder.queryName("SomePQ").tableName("SomeTable").subscribe()

// Get a snapshot of a remote table in the local cluster
snapshot = RemoteTableBuilder.forLocalCluster()
        .queryName("SomePQ")
        .tableName("SomeTable")
        .snapshot()

// Subscribe to a remote table in the local cluster, including only the `Timestamp` column
remoteTable = RemoteTableBuilder.forLocalCluster()
        .queryName("SomePQ")
        .tableName("SomeTable")
        .subscribe(SubscriptionOptions.builder()
                .addIncludedColumns("Timestamp")
                .build())
from deephaven_enterprise import remote_table

# Create a remote table builder for a different Deephaven cluster, by providing the cluster's URL
# The default port is 8000 for Deephaven installations with Envoy, and 8123 for installations without Envoy
builder = (
    remote_table.for_remote_cluster("https://dh-cluster:8000/iris/connection.json")
    .private_key("path/to/private/key")  # or password(user_name, password).
    .query_name("SomePQ")
    .table_name("SomeTable")
)

# Subscribe to `SomeTable` in the remote cluster
rtable = builder.subscribe()


# Get a snapshot of a remote table in the local cluster
snapshot = remote_table.in_local_cluster(
    query_name="SomePQ", table_name="SomeTable"
).snapshot()

# Subscribe to a remote table in the local cluster, including only the `Timestamp` column
rtable = remote_table.in_local_cluster(
    query_name="SomePQ", table_name="SomeTable"
).subscribe(included_columns=["Timestamp"])

Install additional Python packages

To install packages at runtime:

from deephaven_enterprise import venv

try:
    venv.install(["pandas-ta"])
    print("pandas-ta installed successfully.")
except Exception as e:
    print(f"An error occured: {e}.")

Performance overview

You can generate a performance overview of tables and plots for a query by its PID, worker name, or PQ name:

// By PID
performanceOverviewByPiid("52e806dd-af75-412c-a286-ec29aa5571d2")

// By worker name
performanceOverviewByWorkerName("worker_12")

// By PQ name
performanceOverviewByPqName("PqName")
# By PID
performance_overview("52e806dd-af75-412c-a286-ec29aa5571d2")

# By worker name
performance_overview(worker_name="worker_12")

# By PQ name
performance_overview(pq_name="PqName")

Write tables to a user namespace

Though the below uses a namespace called ExampleNamespace, it is best practice for teams to establish a naming protocol for namespaces. The table becomes available as a historical user table.

// Add an unpartitioned table to `ExampleNamespace` - this assumes `ExampleNamespace.ExampleTable` does not exist
db.addUnpartitionedTable("ExampleNamespace", "ExampleTable", staticSource1)

// Add a partitioned table schema to `ExampleNamespace` - `Date` is the partitioning value
db.addPartitionedTableSchema("ExampleNamespace", "ExampleTable", "Date", staticSource1.getDefinition())
// Add a partition to `ExampleNamespace.ExampleTable`
db.addTablePartition("ExampleNamespace", "ExampleTable", "2017-08-25", staticSource1)

// Add data to a live partition - this adds a snapshot of the table when called
ref = db.appendLiveTable("ExampleNamespace", "ExampleTable", "2024-01-05", tickingSource)

// To add data incrementally as it ticks, create and hold onto a reference until you want to stop
ref = db.appendLiveTableIncremental("ExampleNamespace", "ExampleTable", "2024-01-05", tickingSource)
// Some time passes...
ref.close()
# Add an unpartitioned table to `ExampleNamespace` - this assumes `ExampleNamespace.ExampleTable` does not exist
db.add_unpartitioned_table("ExampleNamespace", "ExampleTable", static_source_1)

# Add a partitioned table schema to `ExampleNamespace` - `Date` is the partitioning value
db.add_partitioned_table_schema(
    "ExampleNamespace", "ExampleTable", "Date", static_source_1
)
# Add a partition to `ExampleNamespace.ExampleTable`
db.add_table_partition(
    "ExampleNamespace", "ExampleTable", "2017-08-25", static_source_1
)

# Add data to a live partition - this adds a snapshot of the table when called
ref = db.append_live_table(
    "ExampleNamespace", "ExampleTable", "2024-01-05", ticking_source
)

# To add data incrementally as it ticks, create and hold onto a reference until you want to stop
ref = db.append_live_table_incremental(
    "ExampleNamespace", "ExampleTable", "2024-01-05", ticking_source
)
# Some time passes...
ref.close()