Java経験ゼロからのKinesis Data Streams(2)

Java経験ゼロからのKinesis Data Streams(2):

  • AWSは触るがスクリプト言語(Python、Javascript)がメイン
なエンジニアの勉強記録。

チュートリアル: AWS CLI を使用した Amazon Kinesis Data Streams の使用開始について。

AWS CLIはよく触る人向けに、パラメータについて気になった点を調べつつ。

(入出力で長くなるところは...で略しています)



Streamの作成

aws kinesis create-stream --stream-name Foo --shard-count 1 


Streamの詳細取得

aws kinesis describe-stream --stream-name Foo 
output
{ 
  "StreamDescription": { 
    "Shards": [ 
      { 
        "ShardId": "shardId-000000000000", 
        "HashKeyRange": { 
          "StartingHashKey": "0", 
          "EndingHashKey": "340282366920938463463374607431768211455" 
        }, 
        "SequenceNumberRange": { 
          "StartingSequenceNumber": "49589148559069995537682398109276352603103412499188285442" 
        } 
      } 
    ], 
    "StreamARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxxx:stream/Foo", 
    "StreamName": "Foo", 
    "StreamStatus": "ACTIVE", 
    "RetentionPeriodHours": 24, 
    "EnhancedMonitoring": [ 
      { 
        "ShardLevelMetrics": [] 
      } 
    ], 
    "EncryptionType": "NONE", 
    "KeyId": null, 
    "StreamCreationTimestamp": 1539441977.0 
  } 
} 


出力パラメータについて

※以降公式チュートリアルから少し逸れます(Kinesisの状態も厳密には一緒にはならなくなります、問題ないと思いますが)


HashKeyRange
この後Kinesisにデータを入れる時Partition Keyを指定しますが、そのキーについてMD5ハッシュをかけたものがどちらに収まるかで、書き込み先のシャードが分散されます。

# シャード数を1→2へアップデート 
aws kinesis update-shard-count --stream-name Foo --target-shard-count 2 --scaling-type UNIFORM_SCALING 
# しばらくしてから再度Discribe 
aws kinesis describe-stream --stream-name Foo 
こうするとShards:の中身が

output
{ 
  "ShardId": "shardId-000000000000", 
  "HashKeyRange": { 
    "StartingHashKey": "0", 
    "EndingHashKey": "340282366920938463463374607431768211455" 
  }, 
  "SequenceNumberRange": { 
    "StartingSequenceNumber": "49589148559069995537682398109276352603103412499188285442", 
    "EndingSequenceNumber": "49589148559081145910281663420845911536420133664358137858" 
  } 
}, 
{ 
  "ShardId": "shardId-000000000001", 
  "ParentShardId": "shardId-000000000000", 
  "HashKeyRange": { 
    "StartingHashKey": "0", 
    "EndingHashKey": "170141183460469231731687303715884105727" 
  }, 
  "SequenceNumberRange": { 
    "StartingSequenceNumber": "49589148619750323222884223677395042022979615839272042514" 
  } 
}, 
{ 
  "ShardId": "shardId-000000000002", 
  "ParentShardId": "shardId-000000000000", 
  "HashKeyRange": { 
    "StartingHashKey": "170141183460469231731687303715884105728", 
    "EndingHashKey": "340282366920938463463374607431768211455" 
  }, 
  "SequenceNumberRange": { 
    "StartingSequenceNumber": "49589148619772623968082754300536577741252264200778022946" 
  } 
} 
となります。
shardId-000000000000はClose(書き込めない)状態になり、新たにshardId-000000000001shardId-000000000002がOpenしました。
shardId-000000000000では0 〜 340282366920938463463374607431768211455の範囲だったStartingHashKey, EndingHashKeyが、shardId-000000000001shardId-000000000002の間で等分されていることがわかります。


SequenceNumberRange
Kinesisに納められたレコードの順序制御番号(SequenceNumber)の範囲を示す項目。
公式用語集によると

各データレコードにはそのシャード内で一意のシーケンス番号があります。client.putRecords または client.putRecord を使用してストリームに書き込むと、Kinesis Data Streams によってシーケンス番号が割り当てられます。同じパーティションキーのシーケンス番号は一般的に、時間の経過とともに大きくなります。書き込みリクエスト間の期間が長くなるほど、シーケンス番号は大きくなります。
とあるので、この値の範囲の開始点をしめすのがStartingSequenceNumberのようです。


Streamの一覧取得

aws kinesis list-streams 


Recordの入力

aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata 
output
{ 
  "ShardId": "shardId-000000000001", 
  "SequenceNumber": "49589148619750323222884223677402295577897324024004870162" 
} 
PartitionKeyを123としたところ、shardId-000000000001へと格納されました。

shellでこのパーティションキーをハッシュ化してみると

echo -n 123| md5| tr '[:lower:]' '[:upper:]'| xargs -I@ echo "obase=10;ibase=16;@"| bc 
output
42767516990368493138776584305024125808 # < 170141183460469231731687303715884105728 
となるため、"shardId-000000000001"へと書き込まれるわけです。

PartitionKeyを例えばcatとすると、

aws kinesis put-record --stream-name Foo --partition-key cat --data testdata 
output
{ 
  "ShardId": "shardId-000000000002", 
  "SequenceNumber": "49589148619772623968082754300545040221989606737175380002" 
} 
ハッシュ値が277102220249073555409885156483852860632になるのでshardId-000000000002の方へと書き込まれます。


ShardIteratorの取得

コンシューマがあるシャードのどの位置を読んでいるか、を示すShardIteratorをレコードを取得する際に渡す必要があります。まずはそのShardIteratorを取得します。

aws kinesis get-shard-iterator --shard-id shardId-000000000001 --shard-iterator-type TRIM_HORIZON --stream-name Foo 
output
{ 
  "ShardIterator": "AAAAA...evkSB"  
} 


オプション


shard-iterator-type
保持されているレコードのどの部分をShardIteratorとして取得するかを指定できる(ドキュメント

  • AT_SEQUENCE_NUMBER
  • AFTER_SEQUENCE_NUMBER

    この2つは特定のsequence numberの位置(もしくは次の位置)からShardIteratorを取得。別オプションのStartingSequenceNumberにてそのsequence numberを指定。
  • AT_TIMESTAMP

    特定の時点からShardIteratorを取得。別オプションのStartingSequenceNumberにてそのsequence numberを指定。別オプションTimestampにてタイムスタンプを指定。
  • TRIM_HORIZON

    そのシャード中で刈り取られていない、もっとも古いデータのShardIteratorを取得。
  • LATEST

    そのシャード中でもっとも新しいデータのShardIteratorを取得。


Recordの取得

aws kinesis get-records --shard-iterator AAAAA...evkSB 
output
{ 
  "Records": [ 
    { 
      "SequenceNumber": "49589148619750323222884223677402295577897324024004870162", 
      "ApproximateArrivalTimestamp": 1539442442.446, 
      "Data": "dGVzdGRhdGE=", 
      "PartitionKey": "123" 
    } 
  ], 
  "NextShardIterator": "AAAAA...kA5HY", 
  "MillisBehindLatest": 0 
} 


出力パラメータについて


Data
レコードの中身(今回の場合は"testdata")をBase64エンコードしたもの。

echo dGVzdGRhdGE=| base64 -D 
などで結果確認可能。put-recordではBase64エンコードしたものが格納されるためこうなります。

ちなみに

実際には、AWS CLI を使用してデータを利用することはまれであり、通常は前に示したように(describe-stream および list-streams)、ストリームの状態をモニタリングしたり、情報を取得したりするために使用されるからです。
だそうです。SDKやKCLを利用することを基本は想定していることもあり、CLIはb64デコード機能はもたない模様。


Streamの削除

aws kinesis delete-stream --stream-name Foo 

軽く流すつもりが結構色々Kinesisにとって重要なキーワードばかりだったので理解が深まって良かったです。次は最後のチュートリアルの模様。

コメント

このブログの人気の投稿

投稿時間:2021-06-17 22:08:45 RSSフィード2021-06-17 22:00 分まとめ(2089件)

投稿時間:2021-06-20 02:06:12 RSSフィード2021-06-20 02:00 分まとめ(3871件)

投稿時間:2021-06-17 05:05:34 RSSフィード2021-06-17 05:00 分まとめ(1274件)