Write data to an in-memory, real-time table
This guide covers publishing data to in-memory ticking tables with two classes:
A Table Publisher publishes data to a blink table, while a Dynamic Table Writer writes data to an append-only table. Both methods are great ways to ingest and publish data from external sources such as WebSockets and other live data sources. However, we recommend TablePublisher
in most cases, as it provides a newer and more refined API, as well as native support for blink tables.
Table publisher
A table publisher uses the TablePublisher.of
method to create an instance of the TablePublisher
. Then, call the TablePublisher.table()
method to return the table publisher's linked blink table. You can also:
- Add data to the blink table with
add
. - (Optionally) Store data history in a downstream table.
- (Optionally) Shut the publisher down when finished.
More sophisticated use cases will add steps but follow the same basic formula.
Construct the Table Publisher and its associated blink table
The TablePublisher.of
function returns a TablePublisher
. The following code block creates a table publisher named My publisher
that publishes to a blink table with two columns, X
and Y
, which are int
and double
data types, respectively.
import io.deephaven.csv.util.MutableBoolean
import io.deephaven.engine.table.ColumnDefinition
import io.deephaven.engine.table.TableDefinition
import io.deephaven.stream.TablePublisher
definition = TableDefinition.of(
ColumnDefinition.ofInt("X"),
ColumnDefinition.ofDouble("Y"),
)
shutDown = {println "Finished using My Publisher."}
onShutdown = new MutableBoolean()
publisher = TablePublisher.of("My Publisher", definition, null, shutDown)
source = publisher.table()
Note that since we have not called add
yet, the source
table is empty.
Example: Getting started
The following example creates a table with three columns (X
, Y
, and Z
). The columns initially contain no data because addTable
has not yet been called.
import io.deephaven.csv.util.MutableBoolean
import io.deephaven.engine.table.ColumnDefinition
import io.deephaven.engine.table.TableDefinition
import io.deephaven.stream.TablePublisher
definition = TableDefinition.of(
ColumnDefinition.ofInt("X"),
ColumnDefinition.ofDouble("Y"),
ColumnDefinition.ofDouble("Z")
)
shutDown = {println "Table publisher is shut down"}
publisher = TablePublisher.of("Table publisher", definition, null, shutDown)
publishedTable = publisher.table()
Add data to the blink table by calling the add
method.
publisher.add(emptyTable(5).update("X = randomInt(0, 10)", "Y = randomDouble(0.0, 100.0)", "Z = randomDouble(0.0, 100.0)"))
The TablePublisher
can be shut down by calling publishFailure
.
publisher.publishFailure(new RuntimeException("Publisher shut down by user."))
Example: threading
The following example adds new data to the publisher with emptyTable
every second for 5 seconds in a separate thread. Note how the current execution context is captured and used to add data to the publisher. Attempting to perform table operations in a separate thread without specifying an execution context will raise an exception.
Important
A ticking table in a thread must be updated from within an execution context.
import io.deephaven.engine.context.ExecutionContext
import io.deephaven.csv.util.MutableBoolean
import io.deephaven.engine.table.ColumnDefinition
import io.deephaven.engine.table.TableDefinition
import io.deephaven.stream.TablePublisher
import io.deephaven.util.SafeCloseable
definition = TableDefinition.of(
ColumnDefinition.ofInt("X"),
ColumnDefinition.ofDouble("Y")
)
shutDown = { -> println "Finished."}
onShutdown = new MutableBoolean()
myPublisher = TablePublisher.of("My Publisher", definition, null, shutDown)
myTable = myPublisher.table()
defaultCtx = ExecutionContext.getContext()
myFunc = { ->
try (SafeCloseable ignored = defaultCtx.open()) {
Random rand = new Random()
for (int i = 0; i < 5; ++i) {
nRows = rand.nextInt(5) + 5
myPublisher.add(emptyTable(nRows).update("X = randomInt(0, 10)", "Y = randomDouble(0.0, 100.0)"))
sleep(1000)
}
return
}
}
thread = new Thread(myFunc).start()
Data history
Table publishers create blink tables. Blink tables do not store any data history - data is gone forever at the start of a new update cycle. In most use cases, you will want to store some or all of the rows written during previous update cycles. There are two ways to do this:
- Store some data history by creating a downstream ring table with
RingTableTools.of
. - Store all data history by creating a downstream append-only table with
blinkToAppendOnly
.
See the table types user guide for more information on these table types, including which one is best suited for your application.
To show the storage of data history, we will extend the threading example by creating a downstream ring table and append-only table.
import io.deephaven.engine.table.impl.sources.ring.RingTableTools
import io.deephaven.engine.table.impl.BlinkTableTools
import io.deephaven.engine.context.ExecutionContext
import io.deephaven.engine.table.ColumnDefinition
import io.deephaven.engine.table.TableDefinition
import io.deephaven.csv.util.MutableBoolean
import io.deephaven.stream.TablePublisher
import io.deephaven.util.SafeCloseable
definition = TableDefinition.of(
ColumnDefinition.ofInt('X'),
ColumnDefinition.ofDouble('Y')
)
shutDown = { -> println 'Finished.'}
onShutdown = new MutableBoolean()
myPublisher = TablePublisher.of('My Publisher', definition, null, shutDown)
myTable = myPublisher.table()
defaultCtx = ExecutionContext.getContext()
myFunc = { ->
try (SafeCloseable ignored = defaultCtx.open()) {
Random rand = new Random()
for (int i = 0; i < 5; ++i) {
nRows = rand.nextInt(5) + 5
myPublisher.add(emptyTable(nRows).update('X = randomInt(0, 10)', 'Y = randomDouble(0.0, 100.0)'))
sleep(1000)
}
return
}
}
thread = new Thread(myFunc).start()
myRingTable = RingTableTools.of(myTable, 15, true)
myAppendOnlyTable = BlinkTableTools.blinkToAppendOnly(myTable)
DynamicTableWriter
DynamicTableWriter
writes data into live, in-memory tables by specifying the name and data types of each column. The use of DynamicTableWriter
to write data to an in-memory ticking table generally follows a formula:
- Create the
DynamicTableWriter
. - Get the table that
DynamicTableWriter
will write data to. - Write data to the table (done in a separate thread).
- Close the table writer.
Important
In most cases, a table publisher is the preferred way to write data to a live table. However, it may be more convenient to use DynamicTableWriter
if you are adding very few rows (i.e., one) at a time and you prefer a simple interface. It is almost always more flexible and performant to use TablePublisher
.
Example: Getting started
The following example creates a table with two columns (A
and B
). The columns contain randomly generated integers and strings, respectively. Every second, for ten seconds, a new row is added to the table.
import io.deephaven.engine.table.impl.util.DynamicTableWriter
chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890".toCharArray()
// Create a DynamicTableWriter with two columns: `A` (int) and `B` (String)
columnNames = ["A", "B"] as String[]
columnTypes = [int.class, char.class] as Class[]
tableWriter = new DynamicTableWriter(columnNames, columnTypes)
result = tableWriter.getTable()
def rng = new Random()
// Thread to log data to the dynamic table
def thread = Thread.start {
// for loop that defines how much data to populate to the table
for (int i = 0; i < 10; i++) {
// the data to put into the table
a = rng.nextInt(100)
b = chars[rng.nextInt(62)]
// The logRow method adds a row to the table
tableWriter.logRow(a, b)
// milliseconds between new rows inserted into the table
sleep(1000)
}
return
}
Example: Trig Functions
The following example writes rows containing X
, sin(X)
, cos(X)
, and tan(X)
and plots the functions as the table updates.
import io.deephaven.engine.table.impl.util.DynamicTableWriter
import io.deephaven.plot.FigureFactory
import static java.lang.Math.*
// Define column names and types using primitive classes
columnNames = ["X", "SinX", "CosX", "TanX"] as String[]
columnTypes = [double.class, double.class, double.class, double.class] as Class[]
// Create DynamicTableWriter
tableWriter = new DynamicTableWriter(columnNames, columnTypes)
trigFunctions = tableWriter.getTable()
// Start data writing thread
Thread.start {
for (int i = 0; i < 628; i++) {
long start = System.currentTimeMillis()
double x = 0.01 * i
double sinX = sin(x)
double cosX = cos(x)
double tanX = tan(x)
tableWriter.logRow(x, sinX, cosX, tanX)
long elapsed = System.currentTimeMillis() - start
sleep(200 - elapsed)
}
}
// Create the plot
trig_plot = FigureFactory.figure()
.plot("Sin(X)", trigFunctions, "X", "SinX")
.plot("Cos(X)", trigFunctions, "X", "CosX")
.plot("Tan(X)", trigFunctions, "X", "TanX")
.chartTitle("Trig Functions")
.show()
DynamicTableWriter and the Update Graph
Both the Python interpreter and the DynamicTableWriter
require the Update Graph (UG) lock to execute. As a result, new rows will not appear in output tables until the next UG cycle. As an example, what would you expect the print
statement below to produce?
import io.deephaven.engine.table.impl.util.DynamicTableWriter
columnNames = ["Numbers", "Words"] as String[]
columnTypes = [int.class, String.class] as Class[]
tableWriter = new DynamicTableWriter(columnNames, columnTypes)
result = tableWriter.getTable()
tableWriter.logRow(1, "Testing")
sleep(3000)
tableWriter.logRow(2, "Dynamic")
sleep(3000)
tableWriter.logRow(3, "Table")
sleep(3000)
tableWriter.logRow(4, "Writer")
println result.isEmpty()
You may be surprised, but the table does not contain rows when the print
statement is reached. The Python interpreter holds the UG lock while the code block executes, preventing result
from being updated with the new rows until the next UG cycle. Because print
is in the code block, it sees the table before rows are added.
However, calling the same print
statement as a second command produces the expected result.
println result.isEmpty()
All table updates emanate from the Periodic Update Graph . An understanding of how the Update Graph works can greatly improve query writing.