Bulk Meter Processing

Processes an array of smart meter readings concurrently using a Map state. Each reading is independently validated, usage-calculated, and anomaly-checked before bill generation or flagging for manual review.
{
  "Comment": "Bulk smart meter processing: processes an array of meter readings concurrently (Map, max 5). Each reading is independently validated, usage-calculated, and anomaly-checked before bill generation or flagging for manual review.",
  "StartAt": "ProcessMeterBatch",
  "States": {
    "ProcessMeterBatch": {
      "Type": "Map",
      "Comment": "Process each meter reading in the batch, up to 5 at a time.",
      "ItemsPath": "$.readings",
      "MaxConcurrency": 5,
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "INLINE"
        },
        "StartAt": "ValidateReading",
        "States": {
          "ValidateReading": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            "Parameters": {
              "FunctionName": "${ValidateMeterReadingFunctionArn}",
              "Payload.$": "$"
            },
            "ResultPath": "$.validation",
            "Retry": [
              {
                "ErrorEquals": [
                  "Lambda.ServiceException",
                  "Lambda.AWSLambdaException",
                  "Lambda.SdkClientException"
                ],
                "IntervalSeconds": 2,
                "MaxAttempts": 2,
                "BackoffRate": 2
              }
            ],
            "Catch": [
              {
                "ErrorEquals": [
                  "States.ALL"
                ],
                "ResultPath": "$.error",
                "Next": "ReadingValidationError"
              }
            ],
            "Next": "IsReadingValid"
          },
          "IsReadingValid": {
            "Type": "Choice",
            "Choices": [
              {
                "Variable": "$.validation.Payload.isValid",
                "BooleanEquals": true,
                "Next": "CalculateAndDetect"
              }
            ],
            "Default": "SkipInvalidReading"
          },
          "SkipInvalidReading": {
            "Type": "Pass",
            "Parameters": {
              "skipped": true,
              "reason": "INVALID_READING",
              "meterId.$": "$.meterId"
            },
            "End": true
          },
          "ReadingValidationError": {
            "Type": "Fail",
            "Error": "ValidationError",
            "Cause": "Meter reading validation lambda failed unexpectedly"
          },
          "CalculateAndDetect": {
            "Type": "Parallel",
            "Comment": "Run usage calculation and anomaly detection concurrently per reading.",
            "Branches": [
              {
                "StartAt": "CalculateUsage",
                "States": {
                  "CalculateUsage": {
                    "Type": "Task",
                    "Resource": "arn:aws:states:::lambda:invoke",
                    "Parameters": {
                      "FunctionName": "${CalculateUsageFunctionArn}",
                      "Payload.$": "$"
                    },
                    "End": true
                  }
                }
              },
              {
                "StartAt": "DetectAnomaly",
                "States": {
                  "DetectAnomaly": {
                    "Type": "Task",
                    "Resource": "arn:aws:states:::lambda:invoke",
                    "Parameters": {
                      "FunctionName": "${DetectAnomalyFunctionArn}",
                      "Payload.$": "$"
                    },
                    "End": true
                  }
                }
              }
            ],
            "ResultPath": "$.analysis",
            "Catch": [
              {
                "ErrorEquals": [
                  "States.ALL"
                ],
                "ResultPath": "$.error",
                "Next": "AnalysisError"
              }
            ],
            "Next": "IsCleanReading"
          },
          "AnalysisError": {
            "Type": "Fail",
            "Error": "AnalysisError",
            "Cause": "Usage calculation or anomaly detection lambda failed"
          },
          "IsCleanReading": {
            "Type": "Choice",
            "Choices": [
              {
                "Variable": "$.analysis[1].Payload.isAnomaly",
                "BooleanEquals": false,
                "Next": "GenerateBill"
              }
            ],
            "Default": "FlagForManualReview"
          },
          "FlagForManualReview": {
            "Type": "Pass",
            "Parameters": {
              "flagged": true,
              "reason": "ANOMALY_DETECTED",
              "anomalyType.$": "$.analysis[1].Payload.anomalyType",
              "meterId.$": "$.meterId",
              "accountId.$": "$.accountId"
            },
            "End": true
          },
          "GenerateBill": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            "Parameters": {
              "FunctionName": "${GenerateBillFunctionArn}",
              "Payload": {
                "reading.$": "$",
                "usage.$": "$.analysis[0].Payload.usageRecord"
              }
            },
            "ResultPath": "$.bill",
            "TimeoutSeconds": 30,
            "End": true
          }
        }
      },
      "End": true
    }
  }
}
JSON
Expand
100%

Energy teams can use patterns like this to build reliable, compliant, and scalable automation for payment systems and can test and refine these flows locally with Thrubit to reduce cloud cost and speed up iteration.