Programmatic query management

This guide covers programmatic scheduling, management, and monitoring of Persistent Queries (PQs). PQs are queries that are scheduled to run regularly to perform data ingestion and analysis, create dashboards, and more. Deephaven has both Python and Groovy APIs for managing PQs, both of which are discussed.

Programmatic management allows you to create more dynamic process workflows that would be difficult or impossible using only manual management in the UI. You can monitor, create, stop, restart, and delete PQs programmatically.

This guide does not cover the Query Monitor, a graphical interface for managing PQs. See Query management in the UI for more details.

Monitor

To monitor PQs programmatically, start with the SessionManager class in Python, and the ControllerClientFactory in Groovy.

Note

The code blocks below are run from Code Studios within a Deephaven installation. If connecting remotely, be sure to provide connection details and authentication. See Python client and/or Java client for details.

import io.deephaven.enterprise.dnd.ControllerClientFactory;

client = ControllerClientFactory.makeControllerClientFactory().getSubscribed();
from deephaven_enterprise.client.session_manager import SessionManager

sm = SessionManager()

With this, you can get information on each PQ in the system in a map:

pqs = client.getDataCopy()
pqs = sm.controller_client.map()

This map uses the serial number of each PQ as the key. The following code prints the name and serial number of each PQ:

for (pq in pqs.values()) {
    info = pq.getConfig()
    println (info.getName() + " - " + info.serial)
}
for serial, pq in pqs.items():
    print(f"{pq.config.name} - {serial}")

You can access individual PQs by their serial number:

println pqs[1699635416039000002]
print(pqs[1699635416039000002])

Create

This section presents an example of creating a merge PQ. This process applies to most other PQ types, as the steps are similar.

First, each language requires import statements:

import io.deephaven.enterprise.dnd.ControllerClientFactory
import static io.deephaven.shadow.enterprise.com.illumon.util.IngesterPersistentQueryConstants.*
import io.deephaven.proto.controller.PersistentQueryConfigMessage
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import io.deephaven.proto.controller.RestartUsersEnum
from deephaven_enterprise.proto.persistent_query_pb2 import PersistentQueryConfigMessage
from deephaven_enterprise.client.generate_scheduling import GenerateScheduling
from deephaven_enterprise.client.session_manager import SessionManager
import json
from typing import Dict

Next, define a required method/closure for type encoding that will be used during creation:

// Encodes type-specific fields into the JSON object that the controller expects
encodeJsonStringNode = { ObjectNode root, String name, String value ->
        root.putObject(name).put("type","String").put("value",value)
}
def __encode_type_specific_fields(tsf: Dict) -> str:
    """
    Encodes type-specific fields from a Python dictionary into the JSON object that the controller expects.
    :param tsf: a Python dictionary with type-specific fields
    :return: a JSON encoded string suitable for the controller
    """
    encoded = {}
    for k, v in tsf.items():
        encoded[k] = {"type": "string", "value": v}

    return json.dumps(encoded)

With that done, you must first create a query configuration. This will use the previously defined method/closure to encode the type-specific fields:

configBuilder = PersistentQueryConfigMessage.newBuilder()
configBuilder.setHeapSizeGb(4.0)

ObjectMapper mapper = new ObjectMapper();
ObjectNode rootNode = mapper.createObjectNode();

// Set TypeSpecificField values
encodeJsonStringNode(rootNode, "LowHeapUsage", "false")
encodeJsonStringNode(rootNode, "Force", "false")
encodeJsonStringNode(rootNode, "AllowEmptyInput", "true")
encodeJsonStringNode(rootNode, "SortColumnFormula", "")
encodeJsonStringNode(rootNode, "ThreadPoolSize", "4")
encodeJsonStringNode(rootNode, "Namespace", "LearnDeephaven")
encodeJsonStringNode(rootNode, "Table", "StockQuotes")
encodeJsonStringNode(rootNode, "PartitionFormula", "\"2017-08-25\"")
encodeJsonStringNode(rootNode, "TableDataServiceConfig", "local")
encodeJsonStringNode(rootNode, "Format","Default")

configBuilder.setTypeSpecificFieldsJson(new ObjectMapper().writeValueAsString(rootNode))

// set Long.MIN_VALUE for serial to create a new serial for this PQ
configBuilder.setSerial(Long.MIN_VALUE)
configBuilder.setConfigurationType(CONFIGURATION_TYPE_MERGE)
configBuilder.setName("Test Programmatic Merge Creation")
configBuilder.setOwner("iris")
configBuilder.setEnabled(false)
configBuilder.setServerName("Merge_1")
configBuilder.setDetailedGCLoggingEnabled(false)
configBuilder.setBufferPoolToHeapRatio(.3)
configBuilder.setJvmProfile("Default")
configBuilder.setRestartUsers(RestartUsersEnum.RU_ADMIN)
config = PersistentQueryConfigMessage()

type_specific_fields = {
    "LowHeapUsage": "false",
    "Force": "false",
    "AllowEmptyInput": "true",
    "SortColumnFormula": "",
    "ThreadPoolSize": "4",
    "Namespace": "LearnDeephaven",
    "Table": "StockQuotes",
    "PartitionFormula": '"2017-08-25"',
    "TableDataServiceConfig": "local",
    "Format": "Default",
}

config.typeSpecificFieldsJson = __encode_type_specific_fields(type_specific_fields)

# -(2**63) is Java Long.MIN_VALUE - it allows the controller to assign a serial number
config.serial = -(2**63)
config.configurationType = "Merge"
config.name = "Test Programmatic Merge Creation"
config.version = 1
config.owner = "iris"
config.enabled = False
config.heapSizeGb = 4.0
config.serverName = "Merge_1"
config.detailedGCLoggingEnabled = False
config.jvmProfile = "Default"
config.bufferPoolToHeapRatio = 0.3
config.restartUsers = deephaven_enterprise.proto.persistent_query_pb2.RU_ADMIN

See the Javadoc for a full list of type-specific fields for merge PQs.

Next, define scheduling, which determines when the PQ runs. The following code defines a daily scheduler that runs from all seven days of the week from 07:55:00 to 23:55 in the US/East timezone, along with some additional scheduling parameters:

Note

The Python code below uses many of the default values in generate_daily_scheduler. See the link for details on the defaults.

String[] scheduling = [
                "SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDaily",
                "Calendar=USNYSE",
                "BusinessDays=false",
                "StartTime=07:55:00",
                "StopTime=23:55:00",
                "TimeZone=America/New_York",
                "SchedulingDisabled=true",
                "Overnight=false",
                "RepeatEnabled=false",
                "SkipIfUnsuccessful=false",
                "RestartErrorCount=0",
                "RestartErrorDelay=0",
                "RestartWhenRunning=Yes"
        ];

for (int i=0; i<scheduling.size(); i++) {
        configBuilder.addScheduling(scheduling[i])
}
schedulingArray = GenerateScheduling.generate_daily_scheduler(
    start_time="07:55:55",
    stop_time="23:55:00",
    repeat_interval=3,
)
config.ClearField("scheduling")
config.scheduling.extend(schedulingArray)

For other ways to define scheduling, see Programmatic scheduling.

Lastly, specify timeout values:

// timeout in nanoseconds
configBuilder.setTimeoutNanos(600000000000)
initialize_timeout_seconds: float = 60.0
config.timeoutNanos = int(initialize_timeout_seconds * 1_000_000_000)

With all of the setup done, create the PQ:

configMessage=configBuilder.build()
client.addQuery(configMessage)
sm.controller_client.add_query(config)

The entire code block is as follows:

Full code
import io.deephaven.enterprise.dnd.ControllerClientFactory
import static io.deephaven.shadow.enterprise.com.illumon.util.IngesterPersistentQueryConstants.*
import io.deephaven.proto.controller.PersistentQueryConfigMessage
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import io.deephaven.proto.controller.RestartUsersEnum

// Encodes type-specific fields into the JSON object that the controller expects
encodeJsonStringNode = { ObjectNode root, String name, String value ->
        root.putObject(name).put("type","String").put("value",value)
}

client = ControllerClientFactory.makeControllerClientFactory().getSubscribed();

configBuilder = PersistentQueryConfigMessage.newBuilder()
configBuilder.setHeapSizeGb(4.0)

ObjectMapper mapper = new ObjectMapper();
ObjectNode rootNode = mapper.createObjectNode();

// Set TypeSpecificField values
encodeJsonStringNode(rootNode, "LowHeapUsage", "false")
encodeJsonStringNode(rootNode, "Force", "false")
encodeJsonStringNode(rootNode, "AllowEmptyInput", "true")
encodeJsonStringNode(rootNode, "SortColumnFormula", "")
encodeJsonStringNode(rootNode, "ThreadPoolSize", "4")
encodeJsonStringNode(rootNode, "Namespace", "LearnDeephaven")
encodeJsonStringNode(rootNode, "Table", "StockQuotes")
encodeJsonStringNode(rootNode, "PartitionFormula", "\"2017-08-25\"")
encodeJsonStringNode(rootNode, "TableDataServiceConfig", "local")
encodeJsonStringNode(rootNode, "Format","Default")

configBuilder.setTypeSpecificFieldsJson(new ObjectMapper().writeValueAsString(rootNode))

// set Long.MIN_VALUE for serial to create a new serial for this PQ
configBuilder.setSerial(Long.MIN_VALUE)
configBuilder.setConfigurationType(CONFIGURATION_TYPE_MERGE)
configBuilder.setName("Test Programmatic Merge Creation")
configBuilder.setOwner("iris")
configBuilder.setEnabled(false)
configBuilder.setServerName("Merge_1")
configBuilder.setDetailedGCLoggingEnabled(false)
configBuilder.setBufferPoolToHeapRatio(.3)
configBuilder.setJvmProfile("Default")
configBuilder.setRestartUsers(RestartUsersEnum.RU_ADMIN)

String[] scheduling = [
                "SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDaily",
                "Calendar=USNYSE",
                "BusinessDays=false",
                "StartTime=07:55:00",
                "StopTime=23:55:00",
                "TimeZone=America/New_York",
                "SchedulingDisabled=true",
                "Overnight=false",
                "RepeatEnabled=false",
                "SkipIfUnsuccessful=false",
                "RestartErrorCount=0",
                "RestartErrorDelay=0",
                "RestartWhenRunning=Yes"
        ];

for (int i=0; i<scheduling.size(); i++) {
        configBuilder.addScheduling(scheduling[i])
}

// timeout in nanoseconds
configBuilder.setTimeoutNanos(600000000000)

configMessage=configBuilder.build()
client.addQuery(configMessage)
from deephaven_enterprise.proto.persistent_query_pb2 import PersistentQueryConfigMessage
from deephaven_enterprise.client.generate_scheduling import GenerateScheduling
from deephaven_enterprise.client.session_manager import SessionManager
import json
from typing import Dict


def __encode_type_specific_fields(tsf: Dict) -> str:
    """
    Encodes type-specific fields from a Python dictionary into the JSON object that the controller expects.
    :param tsf: a Python dictionary with type-specific fields
    :return: a JSON encoded string suitable for the controller
    """
    encoded = {}
    for k, v in tsf.items():
        encoded[k] = {"type": "string", "value": v}

    return json.dumps(encoded)


config = PersistentQueryConfigMessage()

type_specific_fields = {
    "LowHeapUsage": "false",
    "Force": "false",
    "AllowEmptyInput": "true",
    "SortColumnFormula": "",
    "ThreadPoolSize": "4",
    "Namespace": "LearnDeephaven",
    "Table": "StockQuotes",
    "PartitionFormula": '"2017-08-25"',
    "TableDataServiceConfig": "local",
    "Format": "Default",
}

config.typeSpecificFieldsJson = __encode_type_specific_fields(type_specific_fields)

# -(2**63) is Java Long.MIN_VALUE - it allows the controller to assign a serial number
config.serial = -(2**63)
config.configurationType = "Merge"
config.name = "Test Programmatic Merge Creation"
config.version = 1
config.owner = "iris"
config.enabled = False
config.heapSizeGb = 4.0
config.serverName = "Merge_1"
config.detailedGCLoggingEnabled = False
config.jvmProfile = "Default"
config.bufferPoolToHeapRatio = 0.3
config.restartUsers = deephaven_enterprise.proto.persistent_query_pb2.RU_ADMIN

schedulingArray = GenerateScheduling.generate_daily_scheduler(
    start_time="07:55:55",
    stop_time="23:55:00",
    repeat_interval=3,
)
config.ClearField("scheduling")
config.scheduling.extend(schedulingArray)

initialize_timeout_seconds: float = 60.0
config.timeoutNanos = int(initialize_timeout_seconds * 1_000_000_000)

sm.controller_client.add_query(config)

Schedule

Earlier, Python used GenerateScheduling to create scheduling details for a PQ. You can alternatively do this closer to how Groovy did it, by passing a list of scheduling parameters to the configuration. The following example creates a daily scheduler that runs from all seven days of the week from 07:55:00 to 23:55 in the US/East timezone, along with some additional scheduling parameters:

scheduling = []
scheduling.append("SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDaily")
scheduling.append("Calendar=USNYSE")
scheduling.append("BusinessDays=false")
scheduling.append("StartTime=07:55:00")
scheduling.append("StopTime=23:55:00")
scheduling.append("TimeZone=America/New_York")
scheduling.append("SchedulingDisabled=true")
scheduling.append("Overnight=false")
scheduling.append("RepeatEnabled=false")
scheduling.append("SkipIfUnsuccessful=false")
scheduling.append("RestartErrorCount=0")
scheduling.append("RestartErrorDelay=0")
scheduling.append("RestartWhenRunning=Yes")

for param in scheduling:
    config.scheduling.append(param)

The earlier Groovy code added scheduling parameters to the configuration one at a time in a loop. You can instead add them all at once:

String[] scheduling = [
                "SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDaily",
                "Calendar=USNYSE",
                "BusinessDays=false",
                "StartTime=07:55:00",
                "StopTime=23:55:00",
                "TimeZone=America/New_York",
                "SchedulingDisabled=true",
                "Overnight=false",
                "RepeatEnabled=false",
                "SkipIfUnsuccessful=false",
                "RestartErrorCount=0",
                "RestartErrorDelay=0",
                "RestartWhenRunning=Yes"
        ];

configBuilder.addAllScheduling(scheduling)

Available schedulers

Deephaven offers the following types of programmatic scheduling for PQs:

Use GenerateScheduler.generate_continuous_scheduler or set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerContinuous in the properties list to create a continuous scheduler.

Use GenerateScheduling.generate_daily_scheduler or set SchedulerType=com.illumon.iris.controller.IrisQueryScheduler in the properties list to create a daily scheduler.

Use GenerateScheduling.generate_dependent_scheduler or set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerDependent in the properties list to create a dependent scheduler.

Use GenerateScheduling.generate_monthly_scheduler or set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerMonthly in the properties list to create a monthly scheduler.

Use GenerateScheduling.generate_range_scheduler or set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerRange in the properties list to create a range scheduler.

Use GenerateScheduling.generate_temporary_scheduler or set SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerTemporary in the properties list to create a temporary scheduler.

Stop

Stopping a PQ programmatically is simple - all you need is its serial number.

import io.deephaven.enterprise.dnd.ControllerClientFactory;

client = ControllerClientFactory.makeControllerClientFactory().getSubscribed();

serials = [1699635416039000002]

client.stopQueriesBySerial(serials)
from deephaven_enterprise.client.session_manager import SessionManager

sm = SessionManager()
sm.controller_client.stop_query(1699635416039000002)

Restart

Restarting a PQ programmatically is simple - all you need is its serial number.

import io.deephaven.enterprise.dnd.ControllerClientFactory;

client = ControllerClientFactory.makeControllerClientFactory().getSubscribed();

client.restartQueriesBySerial([1699635416039000002])
from deephaven_enterprise.client.session_manager import SessionManager

sm = SessionManager()
sm.controller_client.restart_query(1699635416039000002)

Delete

Deleting a PQ programmatically is simple - all you need is its serial number.

Caution

Deleting a PQ is permanent and cannot be undone.

import io.deephaven.enterprise.dnd.ControllerClientFactory;

client = ControllerClientFactory.makeControllerClientFactory().getSubscribed();

client.removeQuery(1699635416039000002)
from deephaven_enterprise.client.session_manager import SessionManager

sm = SessionManager()
sm.controller_client.delete_query(1699635416039000002)