How to use Kafka to stream token data

What is Kafka?

Kafka is an open-source distributed event streaming framework capable of handling trillions of events a day. It can be used to publish and subscribe to streams of records, similar to a message queue or messaging system.

SimpleHash provides Kafka as one of the Bulk Service options to serve high performance workloads, requiring the ingestion of large amounts of token records.

To provision the Kafka Bulk Service to customers, SimpleHash uses Confluent Cloud, which manages the actual streaming infrastructure.

When using the Kafka Bulk Service, customers subscribe to one or more data topics, which include the records of the desired data types (e.g., NFT token metadata for the ethereum chain is available via the ethereum.nft.v4 topic). The full list of available kafka topics and their schemas can be found here.


When might Kafka be a better solution than the REST API?

REST APIs are great for their simplicity, stateless nature, and direct, on-demand data access. However, they might not be the best choice for more demanding use cases. The following are some requirements that could make Kafka a good option:


  • Need for real-time data access: Kafka, being a streaming platform, allows you to get real-time updates on token data as soon as they occur. This can be crucial for applications that rely on up-to-the-second data, such as trading platforms or analytics apps.
  • High throughput: Kafka can process extremely high volumes of data quickly, so it's ideal if you need to ingest large amounts of token data simultaneously or handle high-velocity data streams.
  • Scalability: Kafka is designed to scale out in distributed systems, allowing it to handle massive amounts of data. This makes it a great choice for applications expecting to deal with a large scale of data in the future.
  • Need for stream Processing: Kafka also offers powerful stream processing capabilities. If you want to process your token data (e.g., aggregations, joins) in real-time as it flows through the system, Kafka could be an excellent choice.

How to get started


  1. Understand Bootstrap Servers: bootstrap.servers is a Kafka client configuration for the initial list of Kafka brokers. This list is used to establish the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are provided. The format is host1:port1,host2:port2.
  2. Set up Kafka Clients: Depending on the programming language you are using, you'll have to set up a Kafka client. Some popular Kafka clients include the Confluent libraries for Java or Python, and kafka-node for Node.js.
  3. Configuration: To connect to the Kafka broker, you'll need to pass the bootstrap.servers configuration along with other necessary configurations such as group.id for consumer applications. Please contact SimpleHash support for the required configuration.
  4. Authentication: For Confluent Cloud, you'll need to authenticate with Confluent API keys. Typically, this is done by passing the API Key and Secret via the sasl.jaas.config (Java clients) or equivalent for other language clients. Please contact SimpleHash support for the required credentials.
  5. Subscribe to Topics: A Kafka Topic contains a stream of records, similar to a table in a database. You'll need to subscribe your client to the relevant topics to start receiving messages. SimpleHash support will provision you with the required topics.
  6. Start Reading from Topics: After subscribing to the relevant topics, you can start consuming the messages. This is typically done in a loop that polls for new messages and processes each new message accordingly. Example code for a Python client is shown below.
  7. Interpret Data: Each record consists of a key, header, value, and a timestamp. The key is optional and the value contains the message payload (data). The format and interpretation of this data depend on the schema you've defined or the schema of the data that was used when producing the messages. By default, SimpleHash Kafka streams are serialized in Avro format.

Key Kafka concepts

  • Kafka messages are consumed to by connecting to the SimpleHash Kafka cluster - for access, please contact [email protected]
  • Schemas are provided on a per chain basis (e.g., ethereum.nft.v2 for ethereum)
  • Kafka messages are sent in Avro format.
  • The producer compression format used is snappy
  • The schema is sent alongside the messages to aid with deserialization. We also have a schema registry service for certain topics that eliminates the need to transmit the schema alongside each message.

Message Headers

In each Kafka message, we include the following headers:

KeyValueDescription
modTypeINSERT
UPDATE
DELETE
INSERT denotes when we insert new data into our database.
UPDATE signifies when we update an existing record. Message value will be empty
DELETE indicates when we delete a specific record. Please note that the message value is empty when the modType is set to 'DELETE'.

Example code for consuming a Kafka topic

from pprint import PrettyPrinter
import functools
import fastavro

fastavro.parse_schema = functools.partial(
    fastavro.fastavro.parse_schema, expand=True
)

from confluent_kafka import Consumer, KafkaError
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import MessageField, SerializationContext

schema_registry_client = SchemaRegistryClient(
    {
        "url": "https://psrc-xqq6z.us-central1.gcp.confluent.cloud",
        "basic.auth.user.info": "<Schema Registry API Key>:<Schema Registry API Secret>",
    }
)

config = {
    "bootstrap.servers": "pkc-3w22w.us-central1.gcp.confluent.cloud:9092",
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms": "PLAIN",
    "sasl.username": "<API Key>",
    "sasl.password": "<API Secret>",
    "group.id": "<CONSUMER_GROUP_ID>",  # https://www.confluent.io/blog/configuring-apache-kafka-consumer-group-ids/
    # auto.offset.reset only applies when there is no initial offset in Kafka or if the current offset does not exist any more on the server
    "auto.offset.reset": "earliest",  # https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#auto-offset-reset
}


def _on_assign(consumer, partitions):
    for partition in partitions:
        # Set the desired offset for each partition
        partition.offset = 0  # Assign the offset of 0, adjust as needed

    # Commit the assigned offsets
    consumer.assign(partitions)
    print("Offsets assigned:", partitions)  # noqa: T001


avro_deserializer = AvroDeserializer(schema_registry_client)

TOPICS = ["ethereum.nft.v4"]
consumer = Consumer(config)
consumer.subscribe(TOPICS, on_assign=_on_assign)

pp = PrettyPrinter()

while True:
    msg = consumer.poll(1.0)

    if msg is None:
        print("msg is none")  # noqa: T001
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            print(  # noqa: T001
                "End of partition reached {0}/{1}".format(
                    msg.topic(), msg.partition()
                )
            )
        else:
            print(  # noqa: T001
                "Error while consuming message: {0}".format(msg.error())
            )
    else:
        nft = avro_deserializer(
            msg.value(), SerializationContext(msg.topic(), MessageField.VALUE)
        )
        headers = dict(msg.headers())

        print(  # noqa: T001
            "offset: ", msg.offset(), " key: ", msg.key(), " header: ", headers
        )
        if headers["modType"].decode() in {"INSERT", "UPDATE"}:
            pp.pprint(nft)

            # NFT will be something like this:
            """
            {
                "audio_properties": None,
                "audio_url": None,
                "background_color": None,
                "chain": "ethereum",
                "collection": {
                    "collection_id": "5886e73589c617b47faf44ecda12b970"
                },
                "contract": {
                    "deployed_by": "0x55b5A5527F5Fb0676B800D1AedD3BDF435C37d2f",
                    "deployed_via_contract": None,
                    "name": "The Loud Ones",
                    "symbol": "TLONES",
                    "type": "ERC721",
                },
                "contract_address": "0x000803758151829D103fF188581f87038ae3B893",
                "created_date": "2022-06-07T09:25:30",
                "description": None,
                "external_url": None,
                "first_created": {
                    "block_number": 14920079,
                    "minted_to": "0xa986e3D3083Fe7B65F82aEe03582B195a4F43eb4",
                    "quantity": "1",
                    "timestamp": "2022-06-07T09:25:30",
                    "transaction": "0x9a23aedf3c49caaecc3ce620d1a5b4e90523ee429ae8f45f77de44d6396831db",
                    "transaction_initiator": "0xa986e3D3083Fe7B65F82aEe03582B195a4F43eb4",
                },
                "image_properties": None,
                "image_url": None,
                "last_sale": None,
                "model_properties": None,
                "model_url": None,
                "name": None,
                "nft_id": "ethereum.0x000803758151829d103ff188581f87038ae3b893.1314",
                "owner_count": "1",
                "previews": {
                    "blurhash": None,
                    "image_large_url": None,
                    "image_medium_url": None,
                    "image_opengraph_url": None,
                    "image_small_url": None,
                    "predominant_color": None,
                },
                "rarity": {
                    "rank": None,
                    "score": None,
                    "unique_attributes": None,
                },
                "status": "minted",
                "token_count": "1",
                "token_id": "1314",
                "video_properties": None,
                "video_url": None,
            }
            """

import io
import logging
from pprint import PrettyPrinter

import fastavro
from confluent_kafka import Consumer, KafkaError

config = {
    "bootstrap.servers": "pkc-3w22w.us-central1.gcp.confluent.cloud:9092",
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms": "PLAIN",
    "sasl.username": "<API Key>",
    "sasl.password": "<API Secret>",
    "group.id": "<CONSUMER_GROUP_ID>", # https://www.confluent.io/blog/configuring-apache-kafka-consumer-group-ids/
    # auto.offset.reset only applies when there is no initial offset in Kafka or if the current offset does not exist any more on the server
    "auto.offset.reset": "earliest",  # https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#auto-offset-reset
}

def _on_assign(consumer, partitions):
    for partition in partitions:
        # Set the desired offset for each partition
        partition.offset = 0  # Assign the offset of 0, adjust as needed

    # Commit the assigned offsets
    consumer.assign(partitions)
    print("Offsets assigned:", partitions)
    
TOPICS = ["ethereum.nft.v0"]
consumer = Consumer(config)
consumer.subscribe(TOPICS, on_assign=_on_assign)

pp = PrettyPrinter()


while True:
    msg = consumer.poll(1.0)

    if msg is None:
        print("msg is none")  # noqa: T001
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            print(  # noqa: T001
                "End of partition reached {0}/{1}".format(
                    msg.topic(), msg.partition()
                )
            )
        else:
            print(  # noqa: T001
                "Error while consuming message: {0}".format(msg.error())
            )
    else:
        # Key is NFT ID e.g: 'ethereum.0x000803758151829d103ff188581f87038ae3b893.1314'
        print(msg.key())  # noqa: T001
        try:
            [nft] = list(fastavro.reader(io.BytesIO(msg.value())))
            pp.pprint(nft)

            # NFT will be something like this:
            """
            {
                "audio_properties": None,
                "audio_url": None,
                "background_color": None,
                "chain": "ethereum",
                "collection": {
                    "collection_id": "5886e73589c617b47faf44ecda12b970"
                },
                "contract": {
                    "deployed_by": "0x55b5A5527F5Fb0676B800D1AedD3BDF435C37d2f",
                    "deployed_via_contract": None,
                    "name": "The Loud Ones",
                    "symbol": "TLONES",
                    "type": "ERC721",
                },
                "contract_address": "0x000803758151829D103fF188581f87038ae3B893",
                "created_date": "2022-06-07T09:25:30",
                "description": None,
                "external_url": None,
                "first_created": {
                    "block_number": 14920079,
                    "minted_to": "0xa986e3D3083Fe7B65F82aEe03582B195a4F43eb4",
                    "quantity": "1",
                    "timestamp": "2022-06-07T09:25:30",
                    "transaction": "0x9a23aedf3c49caaecc3ce620d1a5b4e90523ee429ae8f45f77de44d6396831db",
                    "transaction_initiator": "0xa986e3D3083Fe7B65F82aEe03582B195a4F43eb4",
                },
                "image_properties": None,
                "image_url": None,
                "last_sale": None,
                "model_properties": None,
                "model_url": None,
                "name": None,
                "nft_id": "ethereum.0x000803758151829d103ff188581f87038ae3b893.1314",
                "owner_count": "1",
                "previews": {
                    "blurhash": None,
                    "image_large_url": None,
                    "image_medium_url": None,
                    "image_opengraph_url": None,
                    "image_small_url": None,
                    "predominant_color": None,
                },
                "rarity": {
                    "rank": None,
                    "score": None,
                    "unique_attributes": None,
                },
                "status": "minted",
                "token_count": "1",
                "token_id": "1314",
                "video_properties": None,
                "video_url": None,
            }
            """
        except Exception as e:
            logging.error(e)
const { Kafka, CompressionTypes, CompressionCodecs } = require("kafkajs");
const { SchemaRegistry } = require("@kafkajs/confluent-schema-registry");
const SnappyCodec = require("kafkajs-snappy");

CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec;

const registry = new SchemaRegistry({
  host: "https://psrc-xqq6z.us-central1.gcp.confluent.cloud",
  auth: {
    username: "<Schema Registry API Key>",
    password:
      "<Schema Registry API Secret>",
  },
});

// Create a Kafka client instance
const kafka = new Kafka({
  clientId: "<COMPANY_NAME>", // Can be anything
  brokers: ["pkc-3w22w.us-central1.gcp.confluent.cloud:9092"], // Update with your Kafka broker(s) information
  ssl: true,
  sasl: {
    mechanism: "plain",
    username: "<API Key>",
    password: "<API Secret>",
  },
  consumer: {
    compression: CompressionTypes.Snappy, // Set the compression type to Snappy
  },
});

// Create a consumer instance
const consumer = kafka.consumer({ groupId: "<CONSUMER_GROUP_ID>" }); //  https://www.confluent.io/blog/configuring-apache-kafka-consumer-group-ids/

const run = async () => {
  await consumer.connect();
  await consumer.subscribe({ topic: "ethereum.nft.v2", fromBeginning: true }); // Update with your desired topic

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const key = message.key.toString();
      
      // We send the following msg headers: https://docs.simplehash.com/reference/simplehash-bulk-service-overview#message-headers
      
      // Decode msg for INSERT/UPDATE modTypes.
      const decodedValue = await registry.decode(message.value);
      console.log({ key, decodedValue });
    },
  });
};

// Start the consumer
run().catch(console.error);
use env_logger;
use log::{info, warn};

use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};

use rdkafka::message::{Headers, Message};

use schema_registry_converter::async_impl::avro::AvroDecoder;
use schema_registry_converter::async_impl::schema_registry::SrSettings;

async fn consume_and_print() {
    let kafka_api_key = std::env::var("KAFKA_API_KEY").expect("KAFKA_API_KEY not set");
    let kafka_api_secret = std::env::var("KAFKA_API_SECRET").expect("KAFKA_API_SECRET not set");

    let schema_registry_api_key =
        std::env::var("SCHEMA_REGISTRY_API_KEY").expect("SCHEMA_REGISTRY_API_KEY not set");
    let schema_registry_api_secret =
        std::env::var("SCHEMA_REGISTRY_API_SECRET").expect("SCHEMA_REGISTRY_API_SECRET not set");
    let topic_name = std::env::var("TOPIC_NAME").expect("TOPIC_NAME not set");
    let group_id = std::env::var("GROUP_ID").expect("GROUP_ID not set");

    let consumer: StreamConsumer = ClientConfig::new()
        .set(
            "bootstrap.servers",
            "pkc-3w22w.us-central1.gcp.confluent.cloud:9092",
        )
        .set("group.id", group_id)
        .set("enable.auto.commit", "true")
        .set("auto.offset.reset", "earliest")
        .set("security.protocol", "SASL_SSL")
        .set("sasl.mechanisms", "PLAIN")
        .set("sasl.username", kafka_api_key)
        .set("sasl.password", kafka_api_secret)
        .create()
        .expect("Consumer creation failed");

    consumer
        .subscribe(&[topic_name.as_str()])
        .expect("Can't subscribe to specified topic");

    let sr_settings = SrSettings::new_builder(String::from(
        "https://psrc-xqq6z.us-central1.gcp.confluent.cloud",
    ))
    .set_basic_authorization(
        schema_registry_api_key.as_str(),
        Some(schema_registry_api_secret.as_str()),
    )
    .build()
    .unwrap();

    let decoder = AvroDecoder::new(sr_settings);

    loop {
        match consumer.recv().await {
            Err(e) => warn!("Kafka error: {}", e),
            Ok(m) => {
                let payload = match decoder.decode(m.payload()).await {
                    Ok(r) => {
                        info!("r: {:?}", r);
                        ""
                    }
                    Err(e) => {
                        warn!("Error while deserializing message payload: {}", e);
                        ""
                    }
                };
                info!("key: '{}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
                      std::str::from_utf8(m.key().unwrap()).unwrap(), payload, m.topic(), m.partition(), m.offset(), m.timestamp());
                if let Some(headers) = m.headers() {
                    for header in headers.iter() {
                        info!(
                            "  Header {:}: {:}",
                            header.key,
                            std::str::from_utf8(header.value.unwrap()).unwrap()
                        );
                    }
                }
            }
        };
    }
}
#[tokio::main]

async fn main() {
    env_logger::init();
    consume_and_print().await;
}


Example code for backfilling to your own data store using Kafka

In addition to receiving a feed of recently updated data, for some topics you can backfill the history of all objects. This is helpful for creating your own data store which can be queried in a more flexible way. In this example, you are streaming all historical data for the NFT topic on Ethereum to an SQLite database which will be stored locally on the machine running the script.

Once you have backfilled the data to a database, you can run custom SQL queries. These can be transactional read queries or analytical queries based on your needs. You also have the advantage of choosing how to store the data and only storing the data you actually need. This process would be very similar if you want to use a more production-oriented PostgreSQL or MySQL database.

How it works

In this Python script, you'll create a Kafka consumer which receives token records from SimpleHash and writes them to a table in SQLite called nfts. It will store the SimpleHash ID and 4 fields (contract, token id, name, description, and image url) for the token. You begin streaming data from Kafka starting with the earliest records that SimpleHash has. When you connect to Kafka, you choose an offset which determines where your consumer will start reading. Most commonly you'll either start from the beginning (with an offset of 0) or you'll start with the latest data (with an offset of -1). In this case, it's set to 0.

Your consumer is run on a never-ending loop (which is why you have a while True statement), so that as long as it's running, it is always ready to consume messages from the SimpleHash Kafka cluster. It actually makes the network call to consume messages when it runs consumer.poll(1.0). In this call, you are querying the Kafka cluster and waiting up to 1 second for the data to become available before continuing on with the while loop.

Messages are streamed in a compact data format called Avro. You'll be able to deserialize and parse them using the confluent_kafka and fastavro libraries. Events are deserialized into Python dictionaries which can be processed any way you'd like. In our case, we're using the data to prepare an SQL statement which will be executed by the sqlite3 library so that it write the records to our local database.

SimpleHash has over 150 million NFTs stored on Ethereum alone, so to fully backfill to an SQLite database will take a long time (likely hours). For demo purposes, it's recommended to just fill 100,000 or so (which only takes a couple minutes). After you have some entries, stop the script and attempt to query the nfts table in the SQLite database. Congrats! You now have a flexible data store for all your NFT data.


from pprint import PrettyPrinter
import functools
import fastavro
import sqlite3

fastavro.parse_schema = functools.partial(
    fastavro.fastavro.parse_schema, expand=True
)

from confluent_kafka import Consumer, KafkaError
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import MessageField, SerializationContext

#-- Insert your credentials below --#
API_KEY = ""
API_SECRET = ""
CONSUMER_GROUP_ID = ""
SCHEMA_REGISTRY_API_KEY = ""
SCHEMA_REGISTRY_SECRET_KEY = ""
#-----------------------------------#

schema_registry_client = SchemaRegistryClient(
    {
        "url": "https://psrc-xqq6z.us-central1.gcp.confluent.cloud",
        "basic.auth.user.info": f"{SCHEMA_REGISTRY_API_KEY}:{SCHEMA_REGISTRY_SECRET_KEY}",
    }
)

config = {
    "bootstrap.servers": "pkc-3w22w.us-central1.gcp.confluent.cloud:9092",
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms": "PLAIN",
    "sasl.username": API_KEY,
    "sasl.password": API_SECRET,
    "group.id": CONSUMER_GROUP_ID,  # https://www.confluent.io/blog/configuring-apache-kafka-consumer-group-ids/
    # auto.offset.reset only applies when there is no initial offset in Kafka or if the current offset does not exist any more on the server
    "auto.offset.reset": "earliest",
    # https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#auto-offset-reset
}


def _on_assign(consumer, partitions):
    for partition in partitions:
        # Set the desired offset for each partition
        partition.offset = 0  # Assign the offset of 0, adjust as needed

    # Commit the assigned offsets
    consumer.assign(partitions)
    print("Offsets assigned:", partitions)


avro_deserializer = AvroDeserializer(schema_registry_client)

TOPICS = [
    "ethereum.nft.v4"
]
consumer = Consumer(config)
consumer.subscribe(
    TOPICS,
    on_assign=_on_assign
)

pp = PrettyPrinter()


# Events will be something like this:

def run():
    i = 0
    num_errors = 0

    conn = sqlite3.connect("simplehash.db")
    cur = conn.cursor()

    cur.execute("""
    CREATE TABLE IF NOT EXISTS nfts (
    ID VARCHAR(266) PRIMARY KEY,
    contract_address VARCHAR(42),
    token_id TINYTEXT,
    name TEXT,
    description TEXT,
    image_url TEXT
    );""")

    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            print(".", end="")
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print(
                    "End of partition reached {0}/{1}".format(
                        msg.topic(), msg.partition()
                    )
                )
            else:
                print(
                    "Error while consuming message: {0}".format(msg.error())
                )
        else:
            i += 1
            try:
                event = avro_deserializer(
                    msg.value(), SerializationContext(msg.topic(), MessageField.VALUE)
                )
            except Exception:
                continue

            headers = dict(msg.headers())

            print(
                "i", i,
                "offset: ", msg.offset(),
                " timestamp: ", msg.timestamp(),
                " key: ", msg.key(),
                "header: ", headers,
            )
            try:
                sql = """
                    INSERT INTO nfts (id, contract_address, token_id, name, description, image_url)
                    VALUES (:nft_id, :contract_address, :token_id, :name, :description, :image_url)
                """
                cur.execute(sql, event)
                conn.commit()
            except sqlite3.IntegrityError as e:
                print(e)

Support

If you require support or technical assistance with any aspect of the Kafka Bulk Service, please reach out to [email protected].