Data Streaming

Currently supported via Apache Kafka, a distributed event streaming platform.

Use Case

Clients should use the Kafka interface when:

  • Size: data that is under 1MB in JSON format

  • Frequency: requests of >1000 per second

  • Latency: low latency requirements (<5 seconds)

Authentication

Client authentication utilizes the SASL_SSL with a SCRAM-SHA-512 mechanism. Validere provides a username and password combination which will be authorized on topics to ingest a dataset. The username is a client_id.

Topic

Each registered dataset_id has a topic that producer clients will have access to both read and write.

Messages

For messages, the Validere Data Platform uses the JSON format. A record_datetime_utc key is required to be present within the message.

The keys for each message may be null.

Messages will be validated against the dataset schema that has been registered with the Validere Data Platform.

Example

producer_send.py
import logging
import os

from kafka import KafkaProducer
from kafka.errors import KafkaError

load_dotenv()

producer = KafkaProducer(
    bootstrap_servers=BOOTSTRAP,
    security_protocol="SASL_SSL",
    sasl_mechanism="SCRAM-SHA-512",
    sasl_plain_username=os.getenv("SASL_USERNAME"),
    sasl_plain_password=os.getenv("SASL_PASSWORD"),
    value_serializer=lambda m: json.dumps(m).encode("ascii"),
)

try:
    producer.send(
        "some_topic",
        value={
            "RecordDateTimeUTC": "2022-10-21T04:08:52.102Z",
            "Tag": "Tank_A/Type_1_Tanks/Pressure/PVPct",
            "Value": 23.0192,
        },
    )
    producer.flush()
except KafkaError as err:
    logging.error(err)

Last updated