Skip to content

How to manage Kafka streams

The stream functionality enables Memgraph to connect to a Kafka, Pulsar or Redpanda cluster and run graph analytics on the data stream.

Info

You can also use this feature with Neo4j:

db = Neo4j(host="localhost", port="7687", username="neo4j", password="test")

1. Create a Kafka stream in Memgraph

To set up the streams, first, create a MemgraphKafkaStream object with all the required arguments:

  • name: str ➡ The name of the stream.
  • topics: List[str] ➡ List of topic names.
  • transform: str ➡ The transformation procedure for mapping incoming messages to Cypher queries.
  • consumer_group: str ➡ Name of the consumer group in Memgraph.
  • batch_interval: str = None ➡ Maximum wait time in milliseconds for consuming messages before calling the transform procedure.
  • batch_size: str = None ➡ Maximum number of messages to wait for before calling the transform procedure.
  • bootstrap_servers: str = None ➡ Comma-separated list of bootstrap servers.

Now you just have to call the create_stream() method with the newly created MemgraphKafkaStream object:

from gqlalchemy import MemgraphKafkaStream

stream = MemgraphKafkaStream(name="ratings_stream", topics=["ratings"], transform="movielens.rating", bootstrap_servers="localhost:9093")
db.create_stream(stream)

2. Start the stream

To start the stream, just call the start_stream() method:

db.start_stream(stream)

3. Check the status of the stream

To check the status of the stream in Memgraph, just run the following command:

check = db.get_streams()

4. Delete the stream

You can use the drop_stream() method to delete a stream:

check = db.drop_stream(stream)