Streams
Streams are a feature of the Tawon Operator. They are a simple and powerful publish target. A Stream is persistent message queue that can be subscribed to by any number of consumers. Streams are created by the Operator and can be consumed by any application that can connect to the message queue.
Streams are currently implemented using NATS JetStream. This is a high performance and scalable message queue that is part of the NATS ecosystem. NATS is a lightweight, high performance messaging system that is designed for cloud native applications. NATS is a CNCF project under the Apache-2.0 Open Source License.
There are 2 main components to the Stream system in Tawon. The first is the StreamStore, which is a CRD that defines the queue server deployment and configuration. The second is the Stream, which is a CRD that defines 1 persistent message queue, hosted on the StreamStore.
StreamStore
The StreamStore is a CRD that defines the configuration for the queue server. A single StreamStore is normally be used to host many Streams. The StreamStore is where the persistent storage allocation is defined. The StreamStore is backed by a PVC and currently only supports a single instance deployment, though a cluster deployment is planned for the future, to provide HA and scalability.
A StreamStore also defines the max and default limits for the Streams it hosts.
There are 3 types of limits/defaults: - MaxMsgs: The maximum number of messages that can be stored in the Stream. - MaxBytes: The maximum number of bytes that can be stored in the Stream. - MaxAge: The maximum age of a message in the Stream.
The MaxMsgs and MaxBytes limits are hard limits. Once the limit is reached, Older messages will be removed from the Stream to make room for new messages. MaxAge will cause messages to be removed from the Stream once they reach the specified age.
These limits allow the administrator to control the size of the Streams and and their expiration. Very large limits can be used to create a persistent message queue that can be used for long term storage of messages.
Example StreamStore (the storageClassName will depend on your cluster):
apiVersion: tawon.mantisnet.com/v1alpha1
kind: StreamStore
metadata:
name: mystreamstore
spec:
volumeClaimTemplate:
metadata:
name: mypvclaim
labels:
app: mystreamstore
spec:
accessModes:
- ReadWriteOnce
storageClassName: ssd-csi
resources:
requests:
storage: 10Gi
# Describes the lifecycle of persistent volume claims created
# from volumeClaimTemplates. Defaults to Retain.
persistentVolumeClaimRetentionPolicy:
whenDeleted: Delete
# Defaults if the stream creator does not set these values.
defaults:
# Default maximum number of messages per stream.
maxsize: 1000000
# Default max age of messages.
maxage: 240h
# Default max number of bytes per stream.
maxbytes: 1G
# The stream creator cannot exceed these values.
limits:
# Hard maximum number of messages per stream.
max: 10000000
# Hard max age of messages.
maxage: 2400h
# Hard max number of bytes per stream.
maxbytes: 2G
Stream
A Stream is a CRD that defines a single message queue. A Stream is created on the StreamStore by the Operator when a Stream CRD is created. The Stream CRD defines the name of the Stream and the limits for the Stream. The limits function the same as the StreamStore limits, but can be overridden by the StreamStore limits. A Stream must reference a StreamStore.
Example Stream:
apiVersion: tawon.mantisnet.com/v1alpha1
kind: Stream
metadata:
name: mystream
spec:
store:
name: mystreamstore
maxmsgs: 100000
maxage: 6h
maxbytes: 1G
Directives using Streams
Directives can be configured to publish to a Stream. This is done by adding
a streams
section to the directive and specifying the type stream
and naming
the Stream in the publish task. If the Stream does not exist, it will be created
and owned by the Directive. If the Stream already exists, the Directive will
publish to the existing Stream, and the limits passed in the Directive will be
ignored.
Example Directive:
apiVersion: tawon.mantisnet.com/v1alpha1
kind: Directive
metadata:
name: capture-coredns
spec:
condition:
equal:
field: process.name
value: coredns
tasks:
- task: capture
config:
filter: port 53
- task: dns
- task: publish
config:
type: stream
name: coredns
streams:
- name: coredns
store: mystreamstore
maxage: 6h
In this example, the Directive will publish to the coredns
Stream on the
mystreamstore
StreamStore.
A retentionPolicy
can be added to the streams in the streams
section. This
can be configured to cause the Stream to be deleted when the Directive is
deleted. This is useful for temporary Streams that are only needed for the
duration of the Directive. By default, the Stream will not be deleted when the
Directive is deleted.
Example with a temporary Stream:
streams:
- name: coredns
store: mystreamstore
retentionPolicy: Delete
tawonctl integration
The tawonctl
tool can be used to create and manage Streams. The commands for
accessing and manipulating Streams are:
-
tawonctl k8s streams
: List all Streams. -
tawonctl k8s streams sub
: Subscribe to a Stream by name. -
tawonctl k8s streams dump
: Dump a pcap from a Stream by name (the data must be pcap data). -
tawonctl k8s streams wireshark
: Open wireshark using a Stream by name (the data must be pcap data). -
tawonctl k8s streams rm
: Delete a Stream by name.
Sub, dump, and wireshark all support a --all
flag to read the Stream from the
first message. By default, only new messages will be read.
Streams can also be created and used from the k8s directive
subcommand. By
default, the tawonctl k8s directive xxx
subcommands will use nats as the
publish target, if not explicitely specified. To use a Stream, the
--stream-store mystreamstore
flag must be used. This will cause the
Directive to publish to a Stream on the mystreamstore
StreamStore. You can
name the Stream (and the directive itself) with the --name
flag. If the Stream
does not exist, it will be created. Using the sub, dump, and wireshark commands,
if the Directive is temporarily created, for the duration of the command, the
Streams will also be deleted automatically when the command exits.