Streams

Streams are a feature of the Tawon Operator. They are a simple and powerful publish target. A Stream is persistent message queue that can be subscribed to by any number of consumers. Streams are created by the Operator and can be consumed by any application that can connect to the message queue.

Streams are currently implemented using NATS JetStream. This is a high performance and scalable message queue that is part of the NATS ecosystem. NATS is a lightweight, high performance messaging system that is designed for cloud native applications. NATS is a CNCF project under the Apache-2.0 Open Source License.

There are 2 main components to the Stream system in Tawon. The first is the StreamStore, which is a CRD that defines the queue server deployment and configuration. The second is the Stream, which is a CRD that defines 1 persistent message queue, hosted on the StreamStore.

StreamStore

The StreamStore is a CRD that defines the configuration for the queue server. A single StreamStore is normally be used to host many Streams. The StreamStore is where the persistent storage allocation is defined. The StreamStore is backed by a PVC and currently only supports a single instance deployment, though a cluster deployment is planned for the future, to provide HA and scalability.

A StreamStore also defines the max and default limits for the Streams it hosts.

There are 3 types of limits/defaults: - MaxMsgs: The maximum number of messages that can be stored in the Stream. - MaxBytes: The maximum number of bytes that can be stored in the Stream. - MaxAge: The maximum age of a message in the Stream.

The MaxMsgs and MaxBytes limits are hard limits. Once the limit is reached, Older messages will be removed from the Stream to make room for new messages. MaxAge will cause messages to be removed from the Stream once they reach the specified age.

These limits allow the administrator to control the size of the Streams and and their expiration. Very large limits can be used to create a persistent message queue that can be used for long term storage of messages.

Example StreamStore (the storageClassName will depend on your cluster):

apiVersion: tawon.mantisnet.com/v1alpha1
kind: StreamStore
metadata:
  name: mystreamstore
spec:
  volumeClaimTemplate:
    metadata:
      name: mypvclaim
      labels:
        app: mystreamstore
    spec:
      accessModes:
        - ReadWriteOnce
      storageClassName: ssd-csi
      resources:
        requests:
          storage: 10Gi

  # Describes the lifecycle of persistent volume claims created
  # from volumeClaimTemplates. Defaults to Retain.
  persistentVolumeClaimRetentionPolicy:
    whenDeleted: Delete

  # Defaults if the stream creator does not set these values.
  defaults:
    # Default maximum number of messages per stream.
    maxsize: 1000000
    # Default max age of messages.
    maxage: 240h
    # Default max number of bytes per stream.
    maxbytes: 1G

  # The stream creator cannot exceed these values.
  limits:
    # Hard maximum number of messages per stream.
    max: 10000000
    # Hard max age of messages.
    maxage: 2400h
    # Hard max number of bytes per stream.
    maxbytes: 2G
yaml

Stream

A Stream is a CRD that defines a single message queue. A Stream is created on the StreamStore by the Operator when a Stream CRD is created. The Stream CRD defines the name of the Stream and the limits for the Stream. The limits function the same as the StreamStore limits, but can be overridden by the StreamStore limits. A Stream must reference a StreamStore.

Example Stream:

apiVersion: tawon.mantisnet.com/v1alpha1
kind: Stream
metadata:
  name: mystream
spec:
  store:
    name: mystreamstore

  maxmsgs: 100000
  maxage: 6h
  maxbytes: 1G
yaml

Directives using Streams

Directives can be configured to publish to a Stream. This is done by adding a streams section to the directive and specifying the type stream and naming the Stream in the publish task. If the Stream does not exist, it will be created and owned by the Directive. If the Stream already exists, the Directive will publish to the existing Stream, and the limits passed in the Directive will be ignored.

Example Directive:

apiVersion: tawon.mantisnet.com/v1alpha1
kind: Directive
metadata:
  name: capture-coredns
spec:
  condition:
    equal:
      field: process.name
      value: coredns
  tasks:
    - task: capture
      config:
        filter: port 53
    - task: dns
    - task: publish
      config:
        type: stream
        name: coredns
  streams:
    - name: coredns
      store: mystreamstore
      maxage: 6h
yaml

In this example, the Directive will publish to the coredns Stream on the mystreamstore StreamStore.

A retentionPolicy can be added to the streams in the streams section. This can be configured to cause the Stream to be deleted when the Directive is deleted. This is useful for temporary Streams that are only needed for the duration of the Directive. By default, the Stream will not be deleted when the Directive is deleted.

Example with a temporary Stream:

  streams:
    - name: coredns
      store: mystreamstore
      retentionPolicy: Delete
yaml

tawonctl integration

The tawonctl tool can be used to create and manage Streams. The commands for accessing and manipulating Streams are:

  • tawonctl k8s streams: List all Streams.

  • tawonctl k8s streams sub: Subscribe to a Stream by name.

  • tawonctl k8s streams dump: Dump a pcap from a Stream by name (the data must be pcap data).

  • tawonctl k8s streams wireshark: Open wireshark using a Stream by name (the data must be pcap data).

  • tawonctl k8s streams rm: Delete a Stream by name.

Sub, dump, and wireshark all support a --all flag to read the Stream from the first message. By default, only new messages will be read.

Streams can also be created and used from the k8s directive subcommand. By default, the tawonctl k8s directive xxx subcommands will use nats as the publish target, if not explicitely specified. To use a Stream, the --stream-store mystreamstore flag must be used. This will cause the Directive to publish to a Stream on the mystreamstore StreamStore. You can name the Stream (and the directive itself) with the --name flag. If the Stream does not exist, it will be created. Using the sub, dump, and wireshark commands, if the Directive is temporarily created, for the duration of the command, the Streams will also be deleted automatically when the command exits.