TechnologyFebruary 29, 2024

Using GenAI to Find a Needle with Haystack and Astra DB

Carter Rabasa
Carter RabasaHead of Developer Relations
Using GenAI to Find a Needle with Haystack and Astra DB
mkdir astra-haystack
ASTRA_DB_API_ENDPOINT=
ASTRA_DB_APPLICATION_TOKEN=
OPENAI_API_KEY=
TOKENIZERS_PARALLELISM=false
python -m venv venv  
source venv/bin/activate 
pip install astra-haystack sentence-transformers python-dotenv
brew install cmake  
import logging
import os
from dotenv import load_dotenv
from haystack import Pipeline
from haystack.components.fetchers import LinkContentFetcher
from haystack.components.converters import HTMLToDocument
from haystack.components.writers import DocumentWriter
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.document_stores.types import DuplicatePolicy
from haystack_integrations.document_stores.astra import AstraDocumentStore
# load variable defined in .env into the environment
load_dotenv()

# turn on logging at the INFO level
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

# initialize the fetcher that will download content from a webpage as HTML
fetcher = LinkContentFetcher()

# initialize the converter that will take HTML and turn it into plain text
converter = HTMLToDocument()

# initialize the splitter that will take the text and break it into chunks
splitter = DocumentSplitter(split_by="word", split_length=50)

# define the model that we'll use to create embeddings
embedding_model_name = "sentence-transformers/all-MiniLM-L6-v2"

# initialize the document store
document_store = AstraDocumentStore(
    duplicates_policy=DuplicatePolicy.SKIP,
    embedding_dimension=384,
    collection_name="starwars"
)

# initialize the Haystack pipeline
index_pipeline = Pipeline()
# add the components to the pipeline
index_pipeline.add_component(instance=SentenceTransformersDocumentEmbedder(model=embedding_model_name), name="embedder")
index_pipeline.add_component(instance=fetcher, name="fetcher")
index_pipeline.add_component(instance=converter, name="converter")
index_pipeline.add_component(instance=splitter, name="splitter")
index_pipeline.add_component(instance=DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP), name="writer")

# connect the components in the order they should be executed
index_pipeline.connect("fetcher.streams", "converter.sources")
index_pipeline.connect("converter.documents", "splitter.documents")
index_pipeline.connect("splitter.documents", "embedder.documents")
index_pipeline.connect("embedder.documents", "writer.documents")

# run the pipeline
index_pipeline.run(data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Star_Wars"]}})

# print the number of documents processed
print(document_store.count_documents())
python load_data.py
import os
from dotenv import load_dotenv
from haystack import Pipeline
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.builders.answer_builder import AnswerBuilder
from haystack.components.builders.prompt_builder import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from haystack_integrations.document_stores.astra import AstraDocumentStore
from haystack_integrations.components.retrievers.astra import AstraEmbeddingRetriever
# load variable defined in .env into the environment
load_dotenv()

# define the model that we'll use to create the embeddings for our question
embedding_model_name = "sentence-transformers/all-MiniLM-L6-v2"

# define the prompt that we're going to send to OpenAI
prompt_template = """
                Given these documents, answer the question.
                Documents:
                {% for doc in documents %}
                    {{ doc.content }}
                {% endfor %}
                Question: {{question}}
                Answer:
                """

# initialize the document store
document_store = AstraDocumentStore(
    embedding_dimension=384,
    collection_name="starwars"
)

# initialize the Haystack pipeline
rag_pipeline = Pipeline()
# add the components to the pipeline
rag_pipeline.add_component(instance=SentenceTransformersTextEmbedder(model=embedding_model_name), name="embedder")
rag_pipeline.add_component(instance=AstraEmbeddingRetriever(document_store=document_store), name="retriever")
rag_pipeline.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder")
rag_pipeline.add_component(instance=OpenAIGenerator(), name="llm")
rag_pipeline.add_component(instance=AnswerBuilder(), name="answer_builder")

# connect the components in the order they should be executed
rag_pipeline.connect("embedder", "retriever")
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")
rag_pipeline.connect("llm.replies", "answer_builder.replies")
rag_pipeline.connect("llm.meta", "answer_builder.meta")
rag_pipeline.connect("retriever", "answer_builder.documents")

# Run the pipeline
question = "Who is Luke's sister?"
result = rag_pipeline.run(
    {
        "embedder": {"text": question},
        "retriever": {"top_k": 2},
        "prompt_builder": {"question": question},
        "answer_builder": {"query": question},
    }
)

print(result["answer_builder"]["answers"][0].data)
python retrieve_data.py
Leia
Discover more
DataStax Astra DB
Share

One-stop Data API for Production GenAI

Astra DB gives JavaScript developers a complete data API and out-of-the-box integrations that make it easier to build production RAG apps with high relevancy and low latency.