Amazon SQSにデータをためて、lambdaでSQSからデータを取得して処理をするプログラムを作成するにあたり、ネット上にあるサンプルをいろいろ見ていたのですが、メッセージの取得方法はすぐ見つかったのですが、メッセージの属性情報の取得については、あまり記事にしている人がいなかったので、やり方を残しておこうと思います。
■SQSのデータ
SQSのデータは本文は以下のように「テストメッセージ」
属性は「id」が「100」、「data-a」が「あいうえお」、「data-b」が「かきくけこ」とします。
■サンプルプログラム1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
import boto3 from datetime import datetime import uuid QUEUE_URL = "※ここはSQSのURLを入力してください※" REGION = "ap-northeast-1" MAX_GET_NUMBER=10 # 最大取得数(1-10) ※0デフォルト:1 VISIBILITY_TIMEOUT=10 # 可視性タイムアウト WAIT_TIME=5 # メッセージ取得できるまでの待機時間 RECEIVE_PROCESS_COUNT = 3 # 受信処理回数 # -------------------- # ログ # -------------------- def log(text): print('{} {}'.format(datetime.now().strftime('%H:%M:%S.%f'), text)) # -------------------- # lambda処理 # -------------------- def lambda_handler(event, context): log('★lambda_handler START') # 処理回数 process_count = 0 # sqsクライアント sqs_client = boto3.client("sqs", region_name=REGION) # ループ★ for num in range(0, RECEIVE_PROCESS_COUNT): # for start process_count += 1 log(f'■処理回数:{process_count}') # データ受信 response = sqs_client.receive_message(QueueUrl=QUEUE_URL, MaxNumberOfMessages=MAX_GET_NUMBER, VisibilityTimeout=VISIBILITY_TIMEOUT, WaitTimeSeconds=WAIT_TIME) log('response') log(response) # 受信処理 receiveProcess(response) # for end log('★lambda_handler END') # -------------------- # SQSデータ受信 # -------------------- def receiveProcess(response): log('★receiveProcess START') # 受信数 received_count = 0 if 'Messages' in response: log('Messages') log(response['Messages']) for res in response['Messages']: log('Messages') log(res) received_count += len(res) receiptHandle = '' if 'Body' in res: log('Body') log(res['Body']) if 'ReceiptHandle' in res: log('ReceiptHandle') log(res['ReceiptHandle']) receiptHandle = res['ReceiptHandle'] if 'Attributes' in res: log('Attributes') log(res['Attributes']) if 'MessageAttributes' in res: log('MessageAttributes') log(res['MessageAttributes']) log('All Received Count') log(received_count) log('★receiveProcess END') し |
■実行結果1
上記サンプルプログラムを実行すると以下のように
本文は「テストメッセージ」は取得できていますが、属性は取得できていません。
■解決方法
サンプルプログラムではSQSから取得する際に以下のようにしていたのですが、
1 2 |
# データ受信 response = sqs_client.receive_message(QueueUrl=QUEUE_URL, MaxNumberOfMessages=MAX_GET_NUMBER, VisibilityTimeout=VISIBILITY_TIMEOUT, WaitTimeSeconds=WAIT_TIME)ぞ |
メッセージ属性を取得するためには以下のように「MessageAttributeNames = [“All”]」を追加すると取得できました
1 2 |
# データ受信 response = sqs_client.receive_message(QueueUrl=QUEUE_URL, MaxNumberOfMessages=MAX_GET_NUMBER, VisibilityTimeout=VISIBILITY_TIMEOUT, WaitTimeSeconds=WAIT_TIME, MessageAttributeNames = ["All"]) |
■サンプルプログラム(修正後)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
import boto3 from datetime import datetime import uuid QUEUE_URL = "※ここはSQSのURLを入力してください※" REGION = "ap-northeast-1" MAX_GET_NUMBER=10 # 最大取得数(1-10) ※0デフォルト:1 VISIBILITY_TIMEOUT=10 # 可視性タイムアウト WAIT_TIME=5 # メッセージ取得できるまでの待機時間 RECEIVE_PROCESS_COUNT = 3 # 受信処理回数 # -------------------- # ログ # -------------------- def log(text): print('{} {}'.format(datetime.now().strftime('%H:%M:%S.%f'), text)) # -------------------- # lambda処理 # -------------------- def lambda_handler(event, context): log('★lambda_handler START') # 処理回数 process_count = 0 # sqsクライアント sqs_client = boto3.client("sqs", region_name=REGION) # ループ★ for num in range(0, RECEIVE_PROCESS_COUNT): # for start process_count += 1 log(f'■処理回数:{process_count}') # データ受信 response = sqs_client.receive_message(QueueUrl=QUEUE_URL, MaxNumberOfMessages=MAX_GET_NUMBER, VisibilityTimeout=VISIBILITY_TIMEOUT, WaitTimeSeconds=WAIT_TIME, MessageAttributeNames = ["All"]) log('response') log(response) # 受信処理 receiveProcess(response) # for end log('★lambda_handler END') # -------------------- # SQSデータ受信 # -------------------- def receiveProcess(response): log('★receiveProcess START') # 受信数 received_count = 0 if 'Messages' in response: log('Messages') log(response['Messages']) for res in response['Messages']: log('Messages') log(res) received_count += len(res) receiptHandle = '' if 'Body' in res: log('Body') log(res['Body']) if 'ReceiptHandle' in res: log('ReceiptHandle') log(res['ReceiptHandle']) receiptHandle = res['ReceiptHandle'] if 'Attributes' in res: log('Attributes') log(res['Attributes']) if 'MessageAttributes' in res: log('MessageAttributes') log(res['MessageAttributes']) log('All Received Count') log(received_count) log('★receiveProcess END') |
■実行結果(修正後)
上記サンプルプログラム(修正後)を実行すると以下のように
本文「テストメッセージ」とメッセージ属性も取得できました。