Kafka を基本から学ぶ

Kafka とは?

Apache Kafkaは、大規模なデータの収集、処理、保存と統合のための Event Streaming Platform であり、そのユースケースには、分散型ロギング、ストリーム処理、データ統合、メッセージのパブリッシュ/サブスクライブなど、多数が存在します。

Kafka の機能を完全に理解するため、「Event Streaming Platform」の内容とその機能について掘り下げて説明します。では、Kafka のアーキテクチャやコアコンポーネントを詳説する前に、そもそもイベントとは何かについて説明します。Kafka がイベントの保存やシステムからの出し入れ、イベントストリームの分析を行う方法をするかを理解するのにこの内容が役立ちます。

イベントとは?

イベントとは、ソフトウェアやアプリケーションにより識別または記録される、あらゆるタイプのアクション、インシデントや変化を指し、例えば、決済、ウェブサイトのクリックや温度測定などが該当し、起こったことの説明と共に表示されます。

言い換えれば、イベントとは、通知 (他のアクティビティのトリガーとなりうるタイミングの要素) と状態を組み合わせたものとなります。この状態は通常、1メガバイト以下のかなり小さなもので、通常は JSON形式、または Apache Avro™ や Protocol Buffers でシリアル化されたオブジェクトなど、構造化された形式で表現されます。

Kafka とイベント – キーと値のペア

Kafka は分散型コミットログを抽象化したものに基づいており、ログを Partition に分割することでシステムのスケールアウトを実現します。そのため、Kafka ではイベントをキーと値のペアとしてモデル化します。キーや値は内部的には単なるバイト列ですが、外部的には、選択したプログラミング言語の型システムで表現される構造化オブジェクトであることが一般的です。よく知られていることですが、Kafka では、言語の型と内部のバイトとの変換をシリアライズとデシリアライズと呼んでいます。シリアル化された形式は、通常、JSON、JSON Schema、Avro、Protobuf のいずれかとなります。

値は通常、アプリケーションドメインオブジェクトのシリアル化された表現や、センサーの出力などの未加工のメッセージ入力の形式をとります。

キーが複雑なドメインオブジェクトとなる場合もありますが、文字列や整数などのプリミティブな型であることが一般的です。リレーショナルデータベースの行の主キーなどとは異なり、Kafka イベントのキー部分は必ずしもイベントを表す一意の識別子ではなく、ユーザー、注文や接続された特定のデバイスなど、システム内のエンティティの識別子である可能性の方が高くなります。

今はそれほど重要でないように感じられるかもしれませんが、Kafka が並列化やデータの局所性などを処理する際にキーが重要な役割を果たすことを後ほど説明します。

Kafka を選ぶ理由

メリットとユースケース

メリットとユースケース

Kafka は世界中の10万以上の組織で使用されており、最新のストリーム処理の実現を追い求めるプロの開発者たちが集まった活発なコミュニティに支えられています。高スループット、フォールトトレランス、レジリエンス、スケーラビリティといった特徴を備えた Kafka には、銀行や不正検知、運輸や IoT に至るまで、ほぼあらゆる業界で無数のユースケースが存在します。一般的には以下のような目的で使用されています。

データの統合

データの統合

Kafkaは、従来のエンタープライズ情報システムや最新のデータベース、クラウド上のデータベースなど、ほぼすべてのデータソースに接続することができます。ロジックやルーティングを脆弱な集中型インフラに隠すことなく、内蔵されたデータ Connector で効率的な統合ポイントを形成します。

メトリクスとモニタリング

メトリクスとモニタリング

Kafka は運用データのモニタリングによく使われます。このモニタリングには、分散したアプリケーションからの統計情報を集約し、リアルタイムのメトリクスを含む一元化されたフィードを作成することも含まれます。

ログ集計

ログ集計

現代のシステムは一般的に分散型のシステムであり、システムのさまざまなコンポーネントからロギングデータを一箇所に集中させる必要があります。Kafka はしばしば、形式や容量にかかわらずすべてのソースからデータを一元化する信頼できる唯一の情報源として機能します。

ストリーム処理

ストリーム処理

イベントストリームに対するリアルタイムでの計算の実行は、Kafka のコアコンピタンスのひとつです。リアルタイムデータ処理からデータフロープログラミングまで、Kafka はあらゆる規模のデータストリームを生成次第取り込み、保存し、処理します。

メッセージのパブリッシュ/サブスクライブ

メッセージのパブリッシュ/サブスクライブ

分散型パブリッシュ/サブスクライブメッセージングシステムである Kafka は、従来のメッセージブローカーの現代版としてうまく機能します。イベント生成のプロセスをイベント受信のプロセスから切り離す必要がある場合にも最適な、スケーラブルかつ柔軟な方法です。

Kafka のアーキテクチャ – 基本概念

Kafka Topic

イベントには増殖する傾向があります。例えば今朝ご自身に起こった出来事 (イベント) を考えてみてもこれが当てはまるのではないでしょうか。そのため、イベントを整理する仕組みが必要となります。整理するうえでKafka の最も基本的な単位は Topic で、リレーショナルデータベースにおけるテーブルのような役割を果たします。Kafka を扱う開発者として最も関連の深い抽象化が、この Topic です。異なる種類のイベントを保持するために異なる種類の Topic を作成したり、また、こうしたイベントのフィルタリングや変換を行うために、さらに別の種類の Topic を作成することになります。

Topic とは、イベントのログです。よく知られたセマンティクスを持つ単純なデータ構造であるログは、理解しやすいのが特長です。第一の理由は、ログが追記専用で、ログに新しいメッセージを書き込むと必ず最後に追加されること、第二の理由は、ログ内の任意のオフセットを探し、連続したログエントリをスキャンしないと読み込めないことにあります。第三の理由は、ログ上のイベントが不変であり、一度起こったことを元に戻すのは非常に困難なためです。ログが持つこうしたセマンティクスの単純性により、Kafka が Topic との間で持続的に高スループットを実現し、Topic のレプリケーションについての推論をすることが容易となるのです (これについては後述します)。

加えて、ログは基本的に堅牢です。従来のエンタープライズメッセージングシステムには、メッセージを一時的に保存して送信元と送信先の間でバッファリングする、トピックやキューと呼ばれる機能がありました。

Kafka における Topic とはログであるため、Topic 内のデータに一時的な性質のものはありません。Topic はすべて、データが一定期間を過ぎた (または Topic 全体が一定の規模に達した) 後にデータを失効させるよう設定することができます。この期間は、最短で数秒から数年、果ては無期限にメッセージを保持するようにも設定できます。Kafka の Topic の基盤となるログは、ディスク上に保管されたファイルです。イベントを Topic に書き込んだ場合、信頼できるデータベースに書き込んだのと同等の耐久性が得られます。

Kafka が現代のデータインフラストラクチャに欠かせないコンポーネントとして成功した要因には、ログのシンプルさとログに含まれるコンテンツの不変性とがありますが、これらがすべてではありません。

Kafka のパーティショニング

もし Topic が1台のマシン上でのみ完結するように制限されていたとしたら、Kafka のスケーリング能力にかなり大きな制約が生じます。Kafka は本質的に分散型システムのため、多数のマシン上で多数の Topic を管理できますが、1つの Topic が大きくなりすぎたり、読み出しと書き込みの数が多すぎて対応できなくなるようなことはありません。そのような場合には、Kafka なら Topic をパーティショニングすることが可能だからです。

パーティショニングでは、単一の Topic ログを複数のログへ分割し、それぞれのログを Kafka Cluster 内の別のノードに配置します。こうすることで、メッセージの保存、新しいメッセージの書き込み、既存のメッセージの処理などの作業をクラスタ内の多くのノードに分散させることができます。

How Partitioning Works

Having broken a topic up into partitions, we need a way of deciding which messages to write to which partitions. Typically, if a message has no key, subsequent messages will be distributed round-robin among all the topic’s partitions. In this case, all partitions get an even share of the data, but we don’t preserve any kind of ordering of the input messages. If the message does have a key, then the destination partition will be computed from a hash of the key. This allows Kafka to guarantee that messages having the same key always land in the same partition, and therefore are always in order.

For example, if you are producing events that are all associated with the same customer, using the customer ID as the key guarantees that all of the events from a given customer will always arrive in order. This creates the possibility that a very active key will create a larger and more active partition, but this risk is small in practice and is manageable when it presents itself. It is often worth it in order to preserve the ordering of keys.

Kafka Broker

ここまで、イベント、Topic、Partition について説明してきましたが、実際のコンピュータについてはあまり明示的に説明してきませんでした。物理的なインフラストラクチャの観点からいえば、Kafka は Broker と呼ばれるマシンのネットワークで構成されています。最近のデプロイにおいては、独立した物理サーバーではなく、どこかの物理的なデータセンターの実在のプロセッサで動作する仮想化サーバー上で実行されるポッド上のコンテナがこれに当たることもあります。デプロイ方法に限らず、いずれも Kafka の Broker プロセスを実行する独立したマシンです。各 Broker は一連の Partition をホストし、それらの Partition への新しいイベントの書き込みや Partition からのイベントの読み込みなどのリクエストを処理します。また、Broker は相互に Partition のレプリケーションも行います。

レプリケーション

各 Partition を1つの Broker だけに保存していたのでは意味がありません。Broker の形態はベアメタルサーバーやマネージドコンテナなどさまざまですが、Broker とその基盤となるストレージは障害の影響を受けやすいため、Partition データを安全に保つには、複数の Broker へコピーしておく必要があります。こうしたコピーを Follower レプリカ、メインの Partition を Leader レプリカとそれぞれ呼びます。Leader へデータを送信すると (読み込みと書き出しは通常 Leader で行われます)、その Leader と Follower が共同で新しい書き込みを Follower へ複製します。

この動作は自動で行われます。Producer の設定を一部調整してさまざまなレベルの耐久性保証を作り出すことはできますが、Kafka でシステムを構築する開発者がこのプロセスについて考える必要は通常ありません。開発者としては、データが安全に保護されていることと、クラスタの1つのノードに障害が発生しても別のノードがその役割を引き継いでくれることを押さえていれば十分です。

クライアントアプリケーション

では、Kafka Cluster そのものから離れ、Kafka を使用するアプリケーションである Producers と Consumers に移りましょう。これらは、コードを含むクライアントアプリケーションで、Topics へメッセージを入れたり、Topics からメッセージを読み出す機能を果たします。Kafka プラットフォーム内のコンポーネントで Kafka Broker に相当しないものはすべて、Producer と Consumer のいずれか、またその両方となります。この生成と消費がクラスタとのインターフェイスとなります。

Kafka Producer

Producer ライブラリの API サーフェスは非常に軽量で、Java に KafkaProducer というクラスがあり、これを使ってクラスタに接続します。このクラスに、クラスタ内の一部の Broker のアドレス、適切なセキュリティ設定、Producer のネットワーク動作を決定するその他の設定を始めとする構成パラメータのマップを渡します。この他にも、ProducerRecord というクラスがあり、これはクラスタに送信したいキーと値のペアを保持するために使用します。

一次近似的には、これがメッセージ送信のためのすべての API サーフェスとなります。このライブラリは、コネクションプールの管理、ネットワークのバッファリング、Broker によるメッセージ確認の待機、必要に応じたメッセージの再送など、アプリケーションプログラマーが気にかけないようなさまざまな詳細情報の管理を見えないところで行っています。

Producer の例

幸い、こうしたライブラリをすでに他の開発者が用意してくれています。 次を試してみましょう。

(KafkaProducerString,<Payment> producer = new KafkaProducer&lt;String, Payment&gt;(props)) {

    for (long i = 0; i &lt; 10; i++) {
        final String orderId = &quot;id&quot; + Long.toString(i);
        final Payment payment = new Payment(orderId, 1000.00d);
final ProducerRecord&lt;String, Payment&gt; record = 
           new ProducerRecord&lt;String, Payment&gt;(&quot;transactions&quot;, 
                                        payment.getId().toString(), 
payment);
        producer.send(record);
}
} catch (final InterruptedException e) {
      e.printStackTrace();
 }

注 : 上述の説明でもあったように、Partition とは、1つの Topic を多数のログに分割して、異なる Broker でホストできるようにするためのものです。キーのないメッセージをラウンドロビン方式で送信する、送信先の Partition をキーのハッシュ化で計算する、(あまり一般的ではないですが) カスタム設定されたスキームを適用するなど、方法はいろいろですが、ともあれ各メッセージをどの Partition に送信するかを決めるのは Producer です。つまりある意味、パーティショニングは Producer の中に存在するといえます。

Kafka Consumer

Consumer API の使い方は、基本的に Producer と同様です。KafkaConsumer と呼ばれるクラスを使用してクラスタに接続し (ここでクラスタのアドレス、セキュリティなどのパラメータを指定する構成マップを渡します)、その接続を使用して1つまたは複数の Topic を購読します。これらの Topic でメッセージが利用可能となると、メッセージは ConsumerRecord と呼ばれるコレクションに戻ってきます。このコレクションには、ConsumerRecord オブジェクトの形式でメッセージの個々のインスタンスが含まれます。ConsumerRecord オブジェクトは、単一の Kafka メッセージのキー/値のペアを表します。

try (final KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Collections.singletonList(TOPIC));

while (true) {
    ConsumerRecords&lt;String, Payment&gt; records = consumer.poll(100);

for (ConsumerRecord<String, Payment> record : records) { String key = record.key(); Payment value = record.value(); System.out.printf(“key = %s, value = %s%n”, key, value); } } }

KafkaConsumer は、KafkaProducer と同様に接続プーリングやネットワークプロトコルを管理しますが、読み出し側での操作には単なるネットワーク接続以上の意義があります。まず第一に、Kafka は従来のメッセージキューとは異なり、メッセージを読み出してもそれを破棄することはなく、そのメッセージに関心を持つ他の Consumer による読み出しがその後も可能です。実際、Kafka では、多数の Consumer が1つの Topic から読み出しを行うのはごく当たり前のことです。この小さな事実が、Kafka を中心に展開されるソフトウェアアーキテクチャに大きな影響を与えており、各所でもよく紹介されています。

また、Consumer は、Topic からのメッセージ消費量と1つのメッセージの処理のための計算コストが共に高すぎ、アプリケーションの1つのインスタンスでは対応しきれないというシナリオにも対応する必要があります。つまり、Consumer にスケーリングは必須です。Kafka では、Consumer グループのスケーリングはほぼ自動的に行われます。

Kafka のエコシステム

もし御社が、Broker でパーティショニング・複製された Topic を管理し、増え続ける Producer と Consumer の集合でイベントの書き込みや読み出しを行っているだけだとしても、実際にはかなり便利なシステムが実現していると言えます。ただ、Kafka コミュニティのこれまでの経験からは、コアとなる Kafka の周辺に開発者チームが似たような機能を繰り返し構築することになるというある種のパターンが現実にはよく見られます。

最終的に、アプリケーション機能の共通レイヤーを構築し、差別化につながらないタスクを繰り返すことになりかねません。これは確かに重要な働きをするコードではありますが、実際のビジネスには一切関係なく、顧客に直接価値を提供するものでもありません。これはつまりインフラであり、コミュニティやインフラベンダーによって提供されるべきものです。

こうしたコードを開発者が自分で書いてしまうこともできますが、おすすめできません。Kafka Connect、Confluent Schema Registry、Kafka Streams、ksqlDB は、こうした類のインフラストラクチャコードの一例です。それぞれを順に見ていきましょう。

Kafka Connect

情報の保存と検索の世界には、Kafka を使用していないシステムもあるため、他のシステムのデータを Kafka Topic へ取り込んだり、Kafka Topic のデータを他のシステムに取り込むというニーズが生じることもあります。こうした中継点としての役割を果たすのが、Apache Kafka の統合API、Kafka Connect です。

Kafka Connectの機能

Kafka Connect とは、プラグイン式の Connector のエコシステムであり、同時にクライアントアプリケーションでもあります。クライアントアプリケーションとしての Connect は、Kafka Broker 自体からは独立したハードウェア上で実行されるサーバープロセスで、スケーラブルかつフォールトトレラントなため、単一の Connect ワーカーだけでなく、Connect ワーカーのクラスタを実行し、Kafka と外部システム間でデータを出し入れする際の負荷を分担することが可能です。また、Kafka Connect は、ユーザーによるコード作成を必要とせず、JSON の設定だけで実行できるようになっています。例えば、Kafka から Elasticsearch へのデータのストリーミングは以下の方法で行います。

  {
     &quot;connector.class&quot;:&quot;io.confluent.connect.elasticsearch.ElasticsearchSinkConnector&quot;,
     &quot;topics&quot;                          : &quot;my_topic&quot;,
    &quot;connection.url&quot;: &quot;http://elasticsearch:9200&quot;,
    &quot;type.name&quot;: &quot;_doc&quot;,
    &quot;key.ignore&quot;: &quot;true&quot;,
    &quot;schema.ignore&quot;: &quot;true&quot;
 }

Kafka Connect の仕組み

Connect ワーカーは、1つまたは複数の Connector を実行します。Connector とは、外部システムとのインターフェイスとなるプラグイン式のコンポーネントです。ソース Connector は外部システムからデータを読み込み、Kafka Topic へと送信します。シンク Connector は1つまたは複数の Kafka トピックをサブスクライブし、読み取ったメッセージを外部システムに書き込みます。Connector はそれぞれソースまたはシンクのどちらかとなりますが、Kafka Cluster はどちらの場合でも Producer または Consumer しか認識していないことを覚えておくとよいでしょう。Broker でないものはすべて、Producer または Consumer とみなされます。

Kafka Connect のメリット

Kafka Connect の主なメリットとして、大規模な Connector のエコシステムが挙げられます。クラウドの Blob ストアにデータを移動するコードを書いたり、Elasticsearch へ書き込んだり、リレーショナルデータベースへレコードを挿入する際に使用するコードがビジネスによって変わることはまずありません。同様に、リレーショナルデータベース、Salesforce やレガシー HDFS ファイルシステムからの読み出しは、アプリケーションが違っても皆同じ操作です。もちろんこのコードを書くこともできますが、そこに時間をかけても、顧客にユニークな価値を与えたりビジネスの競争優位性を高めることに直接つながるわけではありません。

こうした操作はいずれも、厳選されたあらゆる種類、ライセンスとサポートレベルの Connector が揃うコレクション、Confluent Hub の Kafka Connector で行うことができます。Connector には、商業ライセンスされているものと無料で使えるものがあります。Connect Hub では、あらゆる種類のソース/シンク Connector を検索でき、各 Connector のライセンスが明確に表示されます。もちろん、この Hub 以外の GitHub やその他のマーケットプレイスなどの Connector も使えます。もしやりたいことに適した Connector が見つからない場合には、簡単な API で自作も可能です。

こうした機能を自力で構築するのは簡単に思えるかもしれません。外部のソースシステムが読み出しやすいものであれば、そこから読み込んで目的の Topic に送信するのは容易でしょうし、外部のシンクシステムが書き込みやすいものであれば、Topic から取得してそのシステムに書き込むこともしやすいはずです。ただ、フェイルオーバー処理、水平スケーリング、送受信データに関する一般的な変換操作の管理、共通 Connector コードの配布、標準インターフェイスによる設定や操作など、複雑な問題が発生することはよくあります。

Connect は一見、非常にシンプルに見えますが、実際には複雑な分散システムであり、それ自体がプラグイン式のエコシステムになっています。そのプラグインエコシステムに必要なものが見つからない場合は、オープンソースの Connect フレームワークを使って独自のコネクタを簡単に構築し、Connect の提供する拡張性とフォールトトレランスの特性をすべて受け継ぐことができます。Kafka Connect に関する詳しい説明は、Kafka Connect 101 コース を参照してください。

Schema Registry

アプリケーションが Kafka との間でメッセージを頻繁に送受信するようになると、2つのことが起こります。まず、既存の Topic の新しい Consumer が現れます。これらは新しいアプリケーション (おそらく元のメッセージを作成したチームやその他のチームが書いたアプリケーション) で、その Topic に含まれるメッセージの形式を理解できる必要があります。2つ目に、これらのメッセージの形式はビジネスの進化に合わせて進化していきます。注文オブジェクトに新しいステータスフィールドが追加され、ユーザー名が氏名から名と姓に分割されるなど、さまざまな機能が追加されています。ドメインオブジェクトのスキーマは常に変化するため、特定のトピックにおけるメッセージのスキーマを決めておく手段が必要となります。

この問題を解決するのが Confluent Schema Registry です。

Schema Registry とは?

Schema Registry は、Kafka Broker の外部のマシンで動作するスタンドアロンのサーバープロセスで、担当するクラスタの Topic に書き込まれたすべてのスキーマのデータベースを維持する役割を担っています。この「データベース」は、内部の Kafka Topic で永続化され、低レイテンシでアクセスできるよう Schema Registry にキャッシュされます。Schema Registry は冗長化された高可用性構成で実行できるため、1つのインスタンスに障害が発生しても稼働を継続できます。

Schema Registry は、Producer や Consumer が送信または消費しようとしているメッセージに以前のバージョンと互換性があるかを予測するための API としても機能します。Schema Registry を使用するよう設定された Producer は、Schema Registry REST エンドポイントで API を呼び出し、新しいメッセージのスキーマを提示します。そのスキーマが最後に送信したメッセージのものと同じであれば、送信が成功する可能性があります。最後のメッセージのものと異なっていても、Topic に定義された互換性ルールに合致していれば、送信が成功することもあります。しかし、互換性ルールに違反するような相違がある場合には、送信に失敗します。これは、アプリケーションコードにより検出されます。

同様に、消費側でも、Consumer コードに予測されるバージョンと互換性のないスキーマを持つメッセージを Consumer が読み込んだ場合には、Schema Registry はメッセージを消費しないように指示します。スキーマの進化はどのようなツールを選択しても問題となりますが、Schema Registry はこれを完全に自動化するものではありません。ただ、可能な限りランタイムエラーの発生を防ぐことで、問題をはるかに容易なものとします。

ここまでの内容では、永続的にイベントを保存するシステム、これらのイベントを書き出しまたは読み込む能力、データ統合ネットワーク、進化するスキーマを管理するためのツールについて説明しました。残りは、単にストリーミング処理の計算領域となります。

Kafka Streams

成長する Kafka ベースのアプリケーションでは、Consumer が複雑となる傾向があります。単純なステートレス変換 (社内のスキーマ要件を充足するための個人識別情報のマスキングやメッセージ形式の変更など) として始まったものが、たちまち複雑な集約や充実化などに発展することもままあります。上述の Consumer コードを思い返してみれば、API にはこうした操作をサポートする要素があまりありません。時間枠、遅れて到達したメッセージ、ルックアップテーブル、キーによる集約などの処理に対し、フレームワークとなるコードを大量に構築する必要が出てきます。また、集約や充実化などの操作は一般的にステートフルなものです。

この「ステート」、つまり状態はプログラムのヒープ内のメモリとなり、フォールトトレランスの影響を受けます。ストリーム処理アプリケーションがダウンすると、他のどこかに保存しておかない限り、その状態も一緒に消えてしまいます。こうした状況への対応には大規模かつ非常に複雑な記述やデバッグが必要となり、ユーザビリティの解消には直接結びつきません。こうした理由から、Apache Kafka はストリーム処理 API、Kafka Streams を提供しています。

Kafka Streams とは?

Kafka Streams は、Consumer API の上にフレームワークコードを書く必要なしに、フィルタリング、グループ化、集約や結合など、ストリーム処理のあらゆる基本的な計算要素への手軽なアクセスを可能にする Java API で、ストリーム処理計算から発生する可能性のある大量のステート (状態) にも対応しています。もし、一意の値を持つフィールドで高スループットの Topic 内のイベントをグループ化し、続いてそのグループのロールアップを1時間毎に計算するとしたら、大量のメモリが必要となる可能性があります。

確かに、大容量の Topic や複雑なストリーム処理のトポロジーといえば、通常の Consumer グループのように、ストリーム処理の負荷を分担するマシンのクラスタを展開する必要性を連想してしまうのも無理はありません。Streams API は、分散されたステートの課題を開発者に代わってすべて解決することで、これら2つの問題を解消します。ステートをローカルディスクと Kafka Cluster の内部 Topic に永続化し、クラスタへのストリーム処理ノードの追加や削除時にストリーム処理クラスタ内のノード間で状態の自動的に再割り当てするのです。

一般的なマイクロサービスでは、ストリーム処理は、アプリケーションが他の機能のついでに行うものです。例えば、発送通知サービスの場合には、発送イベントと顧客レコードを含む製品情報内の変更ログのイベントとを組み合わせて発送通知オブジェクトを生成し、他のサービスがこれをメールやテキストメッセージに変換することができます。ただ、その発送通知サービスには、モバイルアプリやウェブのフロントエンドが特定の発送ステータスを示すビューをレンダリングする際に、同期的なキー ルックアップのための REST API を公開する責務もあるかもしれません。

このサービスはイベントに反応します。この場合では、3つのストリームを結合し、場合によっては結合された結果に対して他のウィンドウで計算を実行するのに加え、REST エンドポイントに対して HTTP リクエストも届けていることにになります。一般に使われる Spring Framework や Micronaut などの Java API を使用していることでしょう。Kafka Streams は Java ライブラリであり、ストリーム処理のみを行う専用のインフラストラクチャコンポーネントではないため、他の目的 (REST エンドポイントなど) のための別のフレームワークを使用するサービスや、スケーラブルで高度かつフォールトトレラントなストリーム処理を立ち上げるのは造作もないことです。

Stream API の例

ここでは、Streams API のいくつかのコンセプトを説明するコードリストを紹介します。すべて説明することはしませんが、API のアプローチを理解することは十分できるはずです。このコードは、ストリーム内の値の平均 (raw-ratings) を計算し、その平均をテーブル (movies) に統合して、新しく集計され、充実化された Topic (rated-movies) を生成します。

StreamsBuilder builder = new StreamsBuilder();

builder.stream(&quot;raw-movies&quot;,Consumed.with(Serdes.Long(), Serdes.String()))
.mapValues(Parser::parseMovie)

.map((key,movie) -> new KeyValue<>(movie.getMovieId(), movie)) .to(“movies”,Produced.with(Serdes.Long(), movieSerde));

KTable