Java経験ゼロからのKinesis Data Streams(2)
Java経験ゼロからのKinesis Data Streams(2):
チュートリアル: AWS CLI を使用した Amazon Kinesis Data Streams の使用開始について。
AWS CLIはよく触る人向けに、パラメータについて気になった点を調べつつ。
(入出力で長くなるところは
※以降公式チュートリアルから少し逸れます(Kinesisの状態も厳密には一緒にはならなくなります、問題ないと思いますが)
この後Kinesisにデータを入れる時Partition Keyを指定しますが、そのキーについてMD5ハッシュをかけたものがどちらに収まるかで、書き込み先のシャードが分散されます。
こうすると
となります。
Kinesisに納められたレコードの順序制御番号(SequenceNumber)の範囲を示す項目。
公式用語集によると
PartitionKeyを
shellでこのパーティションキーをハッシュ化してみると
となるため、
PartitionKeyを例えば
ハッシュ値が
コンシューマがあるシャードのどの位置を読んでいるか、を示す
保持されているレコードのどの部分を
レコードの中身(今回の場合は
などで結果確認可能。
ちなみに
軽く流すつもりが結構色々Kinesisにとって重要なキーワードばかりだったので理解が深まって良かったです。次は最後のチュートリアルの模様。
- AWSは触るがスクリプト言語(Python、Javascript)がメイン
チュートリアル: AWS CLI を使用した Amazon Kinesis Data Streams の使用開始について。
AWS CLIはよく触る人向けに、パラメータについて気になった点を調べつつ。
(入出力で長くなるところは
...で略しています)
Streamの作成
aws kinesis create-stream --stream-name Foo --shard-count 1
Streamの詳細取得
aws kinesis describe-stream --stream-name Foo
output
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49589148559069995537682398109276352603103412499188285442"
}
}
],
"StreamARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxxx:stream/Foo",
"StreamName": "Foo",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"KeyId": null,
"StreamCreationTimestamp": 1539441977.0
}
}
出力パラメータについて
※以降公式チュートリアルから少し逸れます(Kinesisの状態も厳密には一緒にはならなくなります、問題ないと思いますが)
HashKeyRange
この後Kinesisにデータを入れる時Partition Keyを指定しますが、そのキーについてMD5ハッシュをかけたものがどちらに収まるかで、書き込み先のシャードが分散されます。# シャード数を1→2へアップデート aws kinesis update-shard-count --stream-name Foo --target-shard-count 2 --scaling-type UNIFORM_SCALING # しばらくしてから再度Discribe aws kinesis describe-stream --stream-name Foo
Shards:の中身がoutput
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49589148559069995537682398109276352603103412499188285442",
"EndingSequenceNumber": "49589148559081145910281663420845911536420133664358137858"
}
},
{
"ShardId": "shardId-000000000001",
"ParentShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "170141183460469231731687303715884105727"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49589148619750323222884223677395042022979615839272042514"
}
},
{
"ShardId": "shardId-000000000002",
"ParentShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "170141183460469231731687303715884105728",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49589148619772623968082754300536577741252264200778022946"
}
}
shardId-000000000000はClose(書き込めない)状態になり、新たにshardId-000000000001、shardId-000000000002がOpenしました。shardId-000000000000では0 〜 340282366920938463463374607431768211455の範囲だったStartingHashKey, EndingHashKeyが、shardId-000000000001、shardId-000000000002の間で等分されていることがわかります。
SequenceNumberRange
Kinesisに納められたレコードの順序制御番号(SequenceNumber)の範囲を示す項目。公式用語集によると
各データレコードにはそのシャード内で一意のシーケンス番号があります。client.putRecords または client.putRecord を使用してストリームに書き込むと、Kinesis Data Streams によってシーケンス番号が割り当てられます。同じパーティションキーのシーケンス番号は一般的に、時間の経過とともに大きくなります。書き込みリクエスト間の期間が長くなるほど、シーケンス番号は大きくなります。とあるので、この値の範囲の開始点をしめすのが
StartingSequenceNumberのようです。
Streamの一覧取得
aws kinesis list-streams
Recordの入力
aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata
output
{
"ShardId": "shardId-000000000001",
"SequenceNumber": "49589148619750323222884223677402295577897324024004870162"
}
123としたところ、shardId-000000000001へと格納されました。shellでこのパーティションキーをハッシュ化してみると
echo -n 123| md5| tr '[:lower:]' '[:upper:]'| xargs -I@ echo "obase=10;ibase=16;@"| bc
output
42767516990368493138776584305024125808 # < 170141183460469231731687303715884105728
"shardId-000000000001"へと書き込まれるわけです。PartitionKeyを例えば
catとすると、aws kinesis put-record --stream-name Foo --partition-key cat --data testdata
output
{
"ShardId": "shardId-000000000002",
"SequenceNumber": "49589148619772623968082754300545040221989606737175380002"
}
277102220249073555409885156483852860632になるのでshardId-000000000002の方へと書き込まれます。
ShardIteratorの取得
コンシューマがあるシャードのどの位置を読んでいるか、を示すShardIteratorをレコードを取得する際に渡す必要があります。まずはそのShardIteratorを取得します。aws kinesis get-shard-iterator --shard-id shardId-000000000001 --shard-iterator-type TRIM_HORIZON --stream-name Foo
output
{
"ShardIterator": "AAAAA...evkSB"
}
オプション
shard-iterator-type
保持されているレコードのどの部分をShardIteratorとして取得するかを指定できる(ドキュメント)AT_SEQUENCE_NUMBERAFTER_SEQUENCE_NUMBER
この2つは特定のsequence numberの位置(もしくは次の位置)からShardIteratorを取得。別オプションのStartingSequenceNumberにてそのsequence numberを指定。AT_TIMESTAMP
特定の時点からShardIteratorを取得。別オプションのStartingSequenceNumberにてそのsequence numberを指定。別オプションTimestampにてタイムスタンプを指定。TRIM_HORIZON
そのシャード中で刈り取られていない、もっとも古いデータのShardIteratorを取得。LATEST
そのシャード中でもっとも新しいデータのShardIteratorを取得。
Recordの取得
aws kinesis get-records --shard-iterator AAAAA...evkSB
output
{
"Records": [
{
"SequenceNumber": "49589148619750323222884223677402295577897324024004870162",
"ApproximateArrivalTimestamp": 1539442442.446,
"Data": "dGVzdGRhdGE=",
"PartitionKey": "123"
}
],
"NextShardIterator": "AAAAA...kA5HY",
"MillisBehindLatest": 0
}
出力パラメータについて
Data
レコードの中身(今回の場合は"testdata")をBase64エンコードしたもの。echo dGVzdGRhdGE=| base64 -D
put-recordではBase64エンコードしたものが格納されるためこうなります。ちなみに
実際には、AWS CLI を使用してデータを利用することはまれであり、通常は前に示したように(describe-stream および list-streams)、ストリームの状態をモニタリングしたり、情報を取得したりするために使用されるからです。だそうです。SDKやKCLを利用することを基本は想定していることもあり、CLIはb64デコード機能はもたない模様。
Streamの削除
aws kinesis delete-stream --stream-name Foo
軽く流すつもりが結構色々Kinesisにとって重要なキーワードばかりだったので理解が深まって良かったです。次は最後のチュートリアルの模様。
コメント
コメントを投稿