How to manage Kafka streams
The stream functionality enables Memgraph to connect to a Kafka, Pulsar or Redpanda cluster and run graph analytics on the data stream.
Info
You can also use this feature with Neo4j:
db = Neo4j(host="localhost", port="7687", username="neo4j", password="test")
1. Create a Kafka stream in Memgraph
To set up the streams, first, create a MemgraphKafkaStream object with all the
required arguments:
- name: str➡ The name of the stream.
- topics: List[str]➡ List of topic names.
- transform: str➡ The transformation procedure for mapping incoming messages to Cypher queries.
- consumer_group: str➡ Name of the consumer group in Memgraph.
- batch_interval: str = None➡ Maximum wait time in milliseconds for consuming messages before calling the transform procedure.
- batch_size: str = None➡ Maximum number of messages to wait for before calling the transform procedure.
- bootstrap_servers: str = None➡ Comma-separated list of bootstrap servers.
Now you just have to call the create_stream() method with the newly created
MemgraphKafkaStream object:
from gqlalchemy import MemgraphKafkaStream
stream = MemgraphKafkaStream(name="ratings_stream", topics=["ratings"], transform="movielens.rating", bootstrap_servers="localhost:9093")
db.create_stream(stream)
2. Start the stream
To start the stream, just call the start_stream() method:
db.start_stream(stream)
3. Check the status of the stream
To check the status of the stream in Memgraph, just run the following command:
check = db.get_streams()
4. Delete the stream
You can use the drop_stream() method to delete a stream:
check = db.drop_stream(stream)