Rag Ingestion Pipeline

End-to-end RAG knowledge base ingestion pipeline covering document intake, text extraction and chunking, embedding generation, vector store indexing, retrieval quality validation, catalog update, and stakeholder notification.

{
  "Comment": "End-to-end RAG knowledge base ingestion pipeline covering document intake, text extraction and chunking, native Bedrock Knowledge Base sync, retrieval quality validation via RetrieveAndGenerate, catalog update, and stakeholder notification.",
  "StartAt": "IngestDocument",
  "States": {
    "IngestDocument": {
      "Type": "Task",
      "Resource": "${IngestDocumentFunctionArn}",
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 3,
          "MaxAttempts": 2,
          "BackoffRate": 2
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "IngestionFailed"
        }
      ],
      "Next": "ExtractTextContent"
    },
    "ExtractTextContent": {
      "Type": "Task",
      "Resource": "${ExtractTextContentFunctionArn}",
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 3,
          "MaxAttempts": 2,
          "BackoffRate": 2
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "IngestionFailed"
        }
      ],
      "Next": "GenerateEmbeddings"
    },
    "GenerateEmbeddings": {
      "Type": "Task",
      "Resource": "${GenerateEmbeddingsFunctionArn}",
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 3,
          "MaxAttempts": 2,
          "BackoffRate": 2
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "IngestionFailed"
        }
      ],
      "Next": "IndexToVectorStore"
    },
    "IndexToVectorStore": {
      "Type": "Task",
      "Resource": "${IndexToVectorStoreFunctionArn}",
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 3,
          "MaxAttempts": 2,
          "BackoffRate": 2
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "IngestionFailed"
        }
      ],
      "Next": "ValidateRetrievalQuality"
    },
    "ValidateRetrievalQuality": {
      "Type": "Task",
      "Resource": "${ValidateRetrievalQualityFunctionArn}",
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 3,
          "MaxAttempts": 2,
          "BackoffRate": 2
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "IngestionFailed"
        }
      ],
      "Next": "RetrieveValidationChunks"
    },
    "RetrieveValidationChunks": {
      "Type": "Task",
      "Resource": "arn:aws:states:::bedrock:retrieve",
      "Parameters": {
        "KnowledgeBaseId.$": "$.knowledgeBaseId",
        "RetrievalQuery": {
          "Text.$": "States.Format('Retrieve relevant content about: {}', $.documentTitle)"
        }
      },
      "ResultSelector": {
        "retrievedChunks.$": "$.retrievalResults",
        "chunkCount.$": "States.ArrayLength($.retrievalResults)"
      },
      "ResultPath": "$.retrievalCheck",
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 3,
          "MaxAttempts": 2,
          "BackoffRate": 2
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "IngestionFailed"
        }
      ],
      "Next": "GenerateRetrievalAnswer"
    },
    "GenerateRetrievalAnswer": {
      "Type": "Task",
      "Resource": "arn:aws:states:::bedrock:retrieveAndGenerate",
      "Parameters": {
        "Input": {
          "Text.$": "States.Format('Summarise the key points of the document about: {}', $.documentTitle)"
        },
        "RetrieveAndGenerateConfiguration": {
          "Type": "KNOWLEDGE_BASE",
          "KnowledgeBaseConfiguration": {
            "KnowledgeBaseId.$": "$.knowledgeBaseId",
            "ModelArn": "arn:aws:bedrock:us-east-1::foundation-model/us.amazon.nova-lite-v1:0"
          }
        }
      },
      "ResultSelector": {
        "generatedAnswer.$": "$.output.text",
        "sessionId.$": "$.sessionId"
      },
      "ResultPath": "$.retrievalValidation",
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 3,
          "MaxAttempts": 2,
          "BackoffRate": 2
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "IngestionFailed"
        }
      ],
      "Next": "IsRetrievalQualitySufficient"
    },
    "IsRetrievalQualitySufficient": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.retrievalCheck.chunkCount",
          "NumericGreaterThan": 0,
          "Next": "ParallelCatalogAndNotify"
        }
      ],
      "Default": "IngestionFailed"
    },
    "ParallelCatalogAndNotify": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "UpdateKnowledgeCatalog",
          "States": {
            "UpdateKnowledgeCatalog": {
              "Type": "Task",
              "Resource": "${UpdateKnowledgeCatalogFunctionArn}",
              "Retry": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "IntervalSeconds": 3,
                  "MaxAttempts": 2,
                  "BackoffRate": 2
                }
              ],
              "End": true
            }
          }
        },
        {
          "StartAt": "NotifyIngestionStatus",
          "States": {
            "NotifyIngestionStatus": {
              "Type": "Task",
              "Resource": "${NotifyIngestionStatusFunctionArn}",
              "Retry": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "IntervalSeconds": 2,
                  "MaxAttempts": 2,
                  "BackoffRate": 2
                }
              ],
              "End": true
            }
          }
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "IngestionFailed"
        }
      ],
      "Next": "IngestionComplete"
    },
    "IngestionComplete": {
      "Type": "Succeed"
    },
    "IngestionFailed": {
      "Type": "Fail",
      "Error": "IngestionFailed",
      "Cause": "RAG ingestion pipeline failed. Document may not be indexed or retrieval quality is below threshold."
    }
  }
}
JSON
Expand
100%

AI teams can use patterns like this to build reliable, compliant, and scalable automation for payment systems and can test and refine these flows locally with Thrubit to reduce cloud cost and speed up iteration.