Apache Ignite的流处理(Streaming)示例程序

发布于:2025-08-07 ⋅ 阅读:(27) ⋅ 点赞:(0)
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.examples.streaming;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.examples.ExampleNodeStartup;
import org.apache.ignite.examples.ExamplesUtils;
import org.apache.ignite.examples.IgniteConstant;
import org.apache.ignite.stream.StreamVisitor;

import java.io.Serializable;
import java.util.List;
import java.util.Random;

/**
 * Stream random numbers into the streaming cache.
 * To start the example, you should:
 * <ul>
 *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
 *     <li>Start streaming using {@link StreamVisitorExample}.</li>
 * </ul>
 * <p>
 * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
 */
public class StreamVisitorExample {
    /** Random number generator. */
    private static final Random RAND = new Random();

    /** The list of instruments. */
    private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", "EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"};

    /** The list of initial instrument prices. */
    private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50};

    /** @param args Command line arguments. */
    public static void main(String[] args) throws Exception {
        // Mark this cluster member as client.
        Ignition.setClientMode(true);

        try (Ignite ignite = Ignition.start(IgniteConstant.IGNITE_CONFIG_LOCATION)) {
            if (!ExamplesUtils.hasServerNodes(ignite))
                return;

            // Market data cache with default configuration.
            CacheConfiguration<String, Double> mktDataCfg = new CacheConfiguration<>("marketTicks");

            // Financial instrument cache configuration.
            CacheConfiguration<String, Instrument> instCfg = new CacheConfiguration<>("instCache");

            // Index key and value for querying financial instruments.
            // Note that Instrument class has @QuerySqlField annotation for secondary field indexing.
            instCfg.setIndexedTypes(String.class, Instrument.class);

            // Auto-close caches at the end of the example.
            try (
                IgniteCache<String, Double> mktCache = ignite.getOrCreateCache(mktDataCfg);
                IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(instCfg)
            ) {
                try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer(mktCache.getName())) {
                    // Note that we receive market data, but do not populate 'mktCache' (it remains empty).
                    // Instead we update the instruments in the 'instCache'.
                    // Since both, 'instCache' and 'mktCache' use the same key, updates are collocated.
                    mktStmr.receiver(StreamVisitor.from((cache, e) -> {
                        String symbol = e.getKey();
                        Double tick = e.getValue();

                        Instrument inst = instCache.get(symbol);

                        if (inst == null)
                            inst = new Instrument(symbol);

                        // Don't populate market cache, as we don't use it for querying.
                        // Update cached instrument based on the latest market tick.
                        inst.update(tick);

                        instCache.put(symbol, inst);
                    }));

                    // Stream 10 million market data ticks into the system.
                    for (int i = 1; i <= 10_000_000; i++) {
                        int idx = RAND.nextInt(INSTRUMENTS.length);

                        // Use gaussian distribution to ensure that
                        // numbers closer to 0 have higher probability.
                        double price = round2(INITIAL_PRICES[idx] + RAND.nextGaussian());

                        mktStmr.addData(INSTRUMENTS[idx], price);

                        if (i % 500_000 == 0)
                            System.out.println("Number of tuples streamed into Ignite: " + i);
                    }
                }

                // Select top 3 best performing instruments.
                SqlFieldsQuery top3qry = new SqlFieldsQuery(
                    "select symbol, (latest - open) from Instrument order by (latest - open) desc limit 3");

                // Execute queries.
                List<List<?>> top3 = instCache.query(top3qry).getAll();

                System.out.println("Top performing financial instruments: ");

                // Print top 10 words.
                ExamplesUtils.printQueryResults(top3);
            }
            finally {
                // Distributed cache could be removed from cluster only by #destroyCache() call.
                ignite.destroyCache(mktDataCfg.getName());
                ignite.destroyCache(instCfg.getName());
            }
        }
    }

    /**
     * Rounds double value to two significant signs.
     *
     * @param val value to be rounded.
     * @return rounded double value.
     */
    private static double round2(double val) {
        return Math.floor(100 * val + 0.5) / 100;
    }

    /**
     * Financial instrument.
     */
    public static class Instrument implements Serializable {
        /** Instrument symbol. */
        @QuerySqlField(index = true)
        private final String symbol;

        /** Open price. */
        @QuerySqlField(index = true)
        private double open;

        /** Close price. */
        @QuerySqlField(index = true)
        private double latest;

        /**
         * @param symbol Symbol.
         */
        public Instrument(String symbol) {
            this.symbol = symbol;
        }

        /**
         * Updates this instrument based on the latest market tick price.
         *
         * @param price Latest price.
         */
        public void update(double price) {
            if (open == 0)
                open = price;

            this.latest = price;
        }
    }
}

这段代码是一个使用 Apache Ignite 的流处理(Streaming)示例程序,主要功能是:

模拟生成金融市场的实时股价数据,通过流式方式输入到 Ignite 集群中,并根据这些数据动态更新金融产品(如股票)的表现,最后查询出表现最好的前三只股票。


我们来逐步解析这个程序的结构和逻辑。


🔹 一、整体功能概述

  • 模拟 1000 万次市场报价(ticks),每个报价包含一个股票代码(如 “IBM”)和当前价格。
  • 使用 IgniteDataStreamer 将这些报价“流”入系统。
  • 不直接将数据存入缓存 mktCache,而是用 StreamVisitor 在接收数据时:
    • 根据股票符号从另一个缓存 instCache 中获取或创建 Instrument 对象;
    • 更新该股票的最新价格和开盘价;
    • 把更新后的对象保存回 instCache
  • 最后用 SQL 查询找出收益最高的前三只股票(最新价 - 开盘价 最大的三个)。

🔹 二、关键类与配置说明

1. Instrument 类(金融产品)
public static class Instrument implements Serializable {
    @QuerySqlField(index = true)
    private final String symbol;

    @QuerySqlField(index = true)
    private double open;

    @QuerySqlField(index = true)
    private double latest;
}
  • 表示一只股票,包含:
    • symbol: 股票代码(如 IBM)
    • open: 开盘价(第一次出现的价格)
    • latest: 最新价格
  • 所有字段都加了 @QuerySqlField,表示可以被 SQL 查询,且建立二级索引,提高查询效率。
2. 缓存配置
CacheConfiguration<String, Double> mktDataCfg = new CacheConfiguration<>("marketTicks");
CacheConfiguration<String, Instrument> instCfg = new CacheConfiguration<>("instCache");
  • marketTicks: 存放市场报价(key=股票代码, value=当前价格),但实际上没真正存数据
  • instCache: 存放 Instrument 对象,记录每只股票的开盘价和最新价。

⚠️ 注意:虽然定义了 mktCache,但在 StreamVisitor 中并没有 put 数据进去,只是“路过”数据。


🔹 三、流式处理核心:IgniteDataStreamerStreamVisitor

try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer(mktCache.getName())) {
    mktStmr.receiver(StreamVisitor.from((cache, e) -> {
        String symbol = e.getKey();
        Double tick = e.getValue();

        Instrument inst = instCache.get(symbol);
        if (inst == null)
            inst = new Instrument(symbol);

        inst.update(tick); // 更新开盘价或最新价
        instCache.put(symbol, inst); // 写回缓存
    }));
StreamVisitor 的作用
  • 这是一个“边接收边处理”的机制。
  • 每来一条 (symbol, price) 数据:
    • instCache 查找对应的 Instrument
    • 如果没有,就新建一个;
    • 调用 inst.update(price)
      • 如果是第一次更新,则 open = price
      • 否则只更新 latest = price
    • 然后把更新后的对象写回 instCache

💡 所以:虽然数据流向的是 mktCache,但我们拦截它,并把处理结果写到另一个缓存 instCache 中。


🔹 四、数据生成逻辑

for (int i = 1; i <= 10_000_000; i++) {
    int idx = RAND.nextInt(INSTRUMENTS.length);
    double price = round2(INITIAL_PRICES[idx] + RAND.nextGaussian());
    mktStmr.addData(INSTRUMENTS[idx], price);
}
  • 循环 1000 万次,模拟市场报价。
  • 随机选择一个股票(INSTRUMENTS 数组)。
  • 价格 = 初始价 + 高斯噪声(正态分布,模拟小幅波动)
  • round2(...):保留两位小数

示例:IBM 初始价 194.9,加上随机波动后变成 195.12 或 194.67 等。


🔹 五、最终查询:找出表现最好的三只股票

SqlFieldsQuery top3qry = new SqlFieldsQuery(
    "select symbol, (latest - open) from Instrument order by (latest - open) desc limit 3"
);

List<List<?>> top3 = instCache.query(top3qry).getAll();
ExamplesUtils.printQueryResults(top3);
  • SQL 查询 Instrument 类型(即 instCache 中的对象)
  • 计算每只股票的收益:latest - open
  • 按收益降序排序,取前 3 名

输出示例(可能):

Top performing financial instruments:
[IBM, 2.34]
[GOOG, 1.87]
[AMZN, 0.95]

🔹 六、其他细节

内容 说明
Ignition.setClientMode(true) 当前节点作为客户端连接集群,不存储数据
ignite.getOrCreateCache(...) 如果缓存不存在就创建
ignite.destroyCache(...) 示例结束时清理资源
@QuerySqlField(index = true) 支持 SQL 查询的关键注解
round2(...) 四舍五入到两位小数

🔹 七、运行前提

To start the example, you should:
 - Start a few nodes using ExampleNodeStartup or by starting remote nodes.
 - Start streaming using StreamVisitorExample.
  • 必须先启动至少一个 服务端节点(运行 ExampleNodeStartup)。
  • 然后运行这个 StreamVisitorExample 作为客户端进行流式写入。

否则会提示没有服务端节点而退出。


🔹 八、这个例子展示了什么?

特性 展示情况
🌐 分布式缓存 使用 IgniteCache 跨节点共享数据
⚡ 流处理 使用 IgniteDataStreamer 高吞吐写入
🧠 边处理边更新 StreamVisitor 实现“流式 ETL”逻辑
🔍 SQL 查询 在内存数据上执行 SQL 分析
📈 实时分析 实时更新股票表现并查询结果

✅ 总结一句话:

这个程序模拟了一个实时金融数据分析系统:不断接收股价流,动态维护每只股票的涨跌情况,并最终通过 SQL 快速查出涨幅最高的股票。


💡 类比理解

想象你在看实时股票行情:

  • 每秒钟都有成千上万条价格变动(这就是“流”);
  • 你不想保存所有历史价格,只想知道:
    • 每只股票今天开盘价是多少?
    • 现在价格是多少?
    • 涨了多少?
  • 所以你设计一个程序:
    • 看到第一条价格 → 记为“开盘价”
    • 后续价格 → 更新为“最新价”
    • 最后算出“涨跌幅”排行榜

👉 这个例子就是这样一个“内存中的实时排行榜系统”。


如果你还想扩展:

  • 可以加入时间窗口(比如最近1小时的涨幅)
  • 加入更多指标(成交量、波动率等)
  • 实时推送结果给前端

需要我帮你改造或加功能吗?


网站公告

今日签到

点亮在社区的每一天
去签到