Java経験ゼロからのKinesis Data Streams(2)
Java経験ゼロからのKinesis Data Streams(2):
チュートリアル: AWS CLI を使用した Amazon Kinesis Data Streams の使用開始について。
AWS CLIはよく触る人向けに、パラメータについて気になった点を調べつつ。
(入出力で長くなるところは
※以降公式チュートリアルから少し逸れます(Kinesisの状態も厳密には一緒にはならなくなります、問題ないと思いますが)
この後Kinesisにデータを入れる時Partition Keyを指定しますが、そのキーについてMD5ハッシュをかけたものがどちらに収まるかで、書き込み先のシャードが分散されます。
こうすると
となります。
Kinesisに納められたレコードの順序制御番号(SequenceNumber)の範囲を示す項目。
公式用語集によると
PartitionKeyを
shellでこのパーティションキーをハッシュ化してみると
となるため、
PartitionKeyを例えば
ハッシュ値が
コンシューマがあるシャードのどの位置を読んでいるか、を示す
保持されているレコードのどの部分を
レコードの中身(今回の場合は
などで結果確認可能。
ちなみに
軽く流すつもりが結構色々Kinesisにとって重要なキーワードばかりだったので理解が深まって良かったです。次は最後のチュートリアルの模様。
- AWSは触るがスクリプト言語(Python、Javascript)がメイン
チュートリアル: AWS CLI を使用した Amazon Kinesis Data Streams の使用開始について。
AWS CLIはよく触る人向けに、パラメータについて気になった点を調べつつ。
(入出力で長くなるところは
...
で略しています)
Streamの作成
aws kinesis create-stream --stream-name Foo --shard-count 1
Streamの詳細取得
aws kinesis describe-stream --stream-name Foo
output
{ "StreamDescription": { "Shards": [ { "ShardId": "shardId-000000000000", "HashKeyRange": { "StartingHashKey": "0", "EndingHashKey": "340282366920938463463374607431768211455" }, "SequenceNumberRange": { "StartingSequenceNumber": "49589148559069995537682398109276352603103412499188285442" } } ], "StreamARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxxx:stream/Foo", "StreamName": "Foo", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 24, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "KeyId": null, "StreamCreationTimestamp": 1539441977.0 } }
出力パラメータについて
※以降公式チュートリアルから少し逸れます(Kinesisの状態も厳密には一緒にはならなくなります、問題ないと思いますが)
HashKeyRange
この後Kinesisにデータを入れる時Partition Keyを指定しますが、そのキーについてMD5ハッシュをかけたものがどちらに収まるかで、書き込み先のシャードが分散されます。# シャード数を1→2へアップデート aws kinesis update-shard-count --stream-name Foo --target-shard-count 2 --scaling-type UNIFORM_SCALING # しばらくしてから再度Discribe aws kinesis describe-stream --stream-name Foo
Shards:
の中身がoutput
{ "ShardId": "shardId-000000000000", "HashKeyRange": { "StartingHashKey": "0", "EndingHashKey": "340282366920938463463374607431768211455" }, "SequenceNumberRange": { "StartingSequenceNumber": "49589148559069995537682398109276352603103412499188285442", "EndingSequenceNumber": "49589148559081145910281663420845911536420133664358137858" } }, { "ShardId": "shardId-000000000001", "ParentShardId": "shardId-000000000000", "HashKeyRange": { "StartingHashKey": "0", "EndingHashKey": "170141183460469231731687303715884105727" }, "SequenceNumberRange": { "StartingSequenceNumber": "49589148619750323222884223677395042022979615839272042514" } }, { "ShardId": "shardId-000000000002", "ParentShardId": "shardId-000000000000", "HashKeyRange": { "StartingHashKey": "170141183460469231731687303715884105728", "EndingHashKey": "340282366920938463463374607431768211455" }, "SequenceNumberRange": { "StartingSequenceNumber": "49589148619772623968082754300536577741252264200778022946" } }
shardId-000000000000
はClose(書き込めない)状態になり、新たにshardId-000000000001
、shardId-000000000002
がOpenしました。shardId-000000000000
では0 〜 340282366920938463463374607431768211455の範囲だったStartingHashKey
, EndingHashKey
が、shardId-000000000001
、shardId-000000000002
の間で等分されていることがわかります。
SequenceNumberRange
Kinesisに納められたレコードの順序制御番号(SequenceNumber)の範囲を示す項目。公式用語集によると
各データレコードにはそのシャード内で一意のシーケンス番号があります。client.putRecords または client.putRecord を使用してストリームに書き込むと、Kinesis Data Streams によってシーケンス番号が割り当てられます。同じパーティションキーのシーケンス番号は一般的に、時間の経過とともに大きくなります。書き込みリクエスト間の期間が長くなるほど、シーケンス番号は大きくなります。とあるので、この値の範囲の開始点をしめすのが
StartingSequenceNumber
のようです。
Streamの一覧取得
aws kinesis list-streams
Recordの入力
aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata
output
{ "ShardId": "shardId-000000000001", "SequenceNumber": "49589148619750323222884223677402295577897324024004870162" }
123
としたところ、shardId-000000000001
へと格納されました。shellでこのパーティションキーをハッシュ化してみると
echo -n 123| md5| tr '[:lower:]' '[:upper:]'| xargs -I@ echo "obase=10;ibase=16;@"| bc
output
42767516990368493138776584305024125808 # < 170141183460469231731687303715884105728
"shardId-000000000001"
へと書き込まれるわけです。PartitionKeyを例えば
cat
とすると、aws kinesis put-record --stream-name Foo --partition-key cat --data testdata
output
{ "ShardId": "shardId-000000000002", "SequenceNumber": "49589148619772623968082754300545040221989606737175380002" }
277102220249073555409885156483852860632
になるのでshardId-000000000002
の方へと書き込まれます。
ShardIteratorの取得
コンシューマがあるシャードのどの位置を読んでいるか、を示すShardIterator
をレコードを取得する際に渡す必要があります。まずはそのShardIterator
を取得します。aws kinesis get-shard-iterator --shard-id shardId-000000000001 --shard-iterator-type TRIM_HORIZON --stream-name Foo
output
{ "ShardIterator": "AAAAA...evkSB" }
オプション
shard-iterator-type
保持されているレコードのどの部分をShardIterator
として取得するかを指定できる(ドキュメント)AT_SEQUENCE_NUMBER
AFTER_SEQUENCE_NUMBER
この2つは特定のsequence numberの位置(もしくは次の位置)からShardIteratorを取得。別オプションのStartingSequenceNumber
にてそのsequence numberを指定。AT_TIMESTAMP
特定の時点からShardIteratorを取得。別オプションのStartingSequenceNumber
にてそのsequence numberを指定。別オプションTimestamp
にてタイムスタンプを指定。TRIM_HORIZON
そのシャード中で刈り取られていない、もっとも古いデータのShardIteratorを取得。LATEST
そのシャード中でもっとも新しいデータのShardIteratorを取得。
Recordの取得
aws kinesis get-records --shard-iterator AAAAA...evkSB
output
{ "Records": [ { "SequenceNumber": "49589148619750323222884223677402295577897324024004870162", "ApproximateArrivalTimestamp": 1539442442.446, "Data": "dGVzdGRhdGE=", "PartitionKey": "123" } ], "NextShardIterator": "AAAAA...kA5HY", "MillisBehindLatest": 0 }
出力パラメータについて
Data
レコードの中身(今回の場合は"testdata"
)をBase64エンコードしたもの。echo dGVzdGRhdGE=| base64 -D
put-record
ではBase64エンコードしたものが格納されるためこうなります。ちなみに
実際には、AWS CLI を使用してデータを利用することはまれであり、通常は前に示したように(describe-stream および list-streams)、ストリームの状態をモニタリングしたり、情報を取得したりするために使用されるからです。だそうです。SDKやKCLを利用することを基本は想定していることもあり、CLIはb64デコード機能はもたない模様。
Streamの削除
aws kinesis delete-stream --stream-name Foo
軽く流すつもりが結構色々Kinesisにとって重要なキーワードばかりだったので理解が深まって良かったです。次は最後のチュートリアルの模様。
コメント
コメントを投稿