文章目录
前言
CQEngine(Collection Query Engine) 集合查询引擎,是一个高性能的Java集合,可以使用类似 SQL 的查询语句进行搜索,且延迟极低。
- 每秒实现数百万个查询,查询延迟以微秒为单位
- 从数据库中卸载查询流量,扩展应用程序层
- 即使是在低端硬件上,其性能也比数据库高出数千倍
支持堆内持久化、堆外持久化、磁盘持久化,并支持 MVCC 事务隔离。
CQEngine 通过以空间换时间的策略,对存储在集合中的对象的字段上建立索引,并应用基于集理论规则的算法来降低访问它们的时间复杂度,解决了迭代的可伸缩性和延迟问题。
github 地址:https://github.com/npgall/cqengine
pom 依赖:
<dependency>
<groupId>com.googlecode.cqengine</groupId>
<artifactId>cqengine</artifactId>
<version>3.6.0</version>
</dependency>
一、简单使用
1.1 POJO
CQEngine 需要为 POJO 类里字段建立索引加快查询速度,需要给字段建立 Attribute
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User {
public enum Role {ADMIN, NORMAL, SUPER}
private int userId;
private String name;
private Role role;
private double height;
private List<String> features;
public static final Attribute<User, Integer> USER_ID = attribute("userId", User::getUserId);
public static final Attribute<User, String> NAME = attribute("name", User::getName);
public static final Attribute<User, Role> ROLE = attribute("role", User::getRole);
public static final Attribute<User, Double> HEIGHT = attribute("height", User::getHeight);
public static final Attribute<User, String> FEATURES = attribute(String.class, "features", User::getFeatures);
}
1.2 索引
一般会给 Attribute 建立索引,不同的索引有不同的作用,具体可以查看文档
常用的有:
- HashIndex 底层是 ConcurrentHashMap
- NavigableIndex 底层是 ConcurrentSkipListMap
IndexedCollection<User> users = new ConcurrentIndexedCollection<>();
users.addIndex(NavigableIndex.onAttribute(User.USER_ID));
users.addIndex(InvertedRadixTreeIndex.onAttribute(User.NAME));
users.addIndex(HashIndex.onAttribute(User.NAME));
users.addIndex(NavigableIndex.onAttribute(User.HEIGHT));
users.addIndex(HashIndex.onAttribute(User.FEATURES));
1.3 普通查询
位于import static com.googlecode.cqengine.query.QueryFactory.*;
有很多静态方法,能够用来辅助查询
public static void main(String[] args) {
IndexedCollection<User> users = new ConcurrentIndexedCollection<>();
User user1 = User.builder().userId(1).name("张三").role(User.Role.SUPER).height(180.0)
.features(Lists.newArrayList("脸大", "近视")).build();
User user2 = User.builder().userId(2).name("李四").role(User.Role.ADMIN).height(171.1)
.features(Lists.newArrayList("脸小", "正常")).build();
User user3 = User.builder().userId(3).name("王五").role(User.Role.NORMAL).height(160.5)
.features(Lists.newArrayList("脸小", "近视")).build();
User user4 = User.builder().userId(4).name("张二").role(User.Role.SUPER).height(190.9)
.features(Lists.newArrayList("脸大", "正常")).build();
users.addAll(Lists.newArrayList(user1, user2, user3, user4));
users.addIndex(NavigableIndex.onAttribute(User.USER_ID));
users.addIndex(InvertedRadixTreeIndex.onAttribute(User.NAME));
users.addIndex(HashIndex.onAttribute(User.NAME));
users.addIndex(NavigableIndex.onAttribute(User.HEIGHT));
users.addIndex(HashIndex.onAttribute(User.FEATURES));
Query<User> nameQuery = contains(User.NAME, "张");
display(users, nameQuery);
Query<User> heightQuery = between(User.HEIGHT, 150.0, 180.0);
display(users, heightQuery);
Query<User> roleQuery = equal(User.ROLE, User.Role.SUPER);
display(users, roleQuery);
Query<User> andQuery = and(roleQuery, heightQuery);
display(users, andQuery);
Query<User> userQuery = in(User.FEATURES, "近视", "脸大");
display(users, userQuery);
Query<User> andUserQuery = and (in(User.FEATURES, "近视"),
in(User.FEATURES, "脸大"));
display(users, andUserQuery);
}
private static void display(IndexedCollection<User> users, Query<User> query) {
ResultSet<User> r = users.retrieve(query);
for (User user : r) {
System.out.println(user);
}
System.out.println("---------");
}
二、高级特性
2.1 Null值
上述属性在查询时,遇到 null 值会报空指针,可在 POJO 内将 attribute 方法换成 nullableAttribute
public static final Attribute<User, String> FEATURES = nullableAttribute(String.class, "features", User::getFeatures);
添加一个null 数据测试
User user5 = User.builder().userId(5).name("李七").role(User.Role.NORMAL).height(170.0)
.features(null).build();
2.2 类SQL 查询
CQEngine 提供类SQL语句解析器对集合进行查询,使用的是 antlr 解析器
SQLParser<User> parser = new SQLParser(User.class){{
registerAttribute(User.USER_ID);
registerAttribute(User.NAME);
registerAttribute(User.ROLE);
registerAttribute(User.HEIGHT);
registerAttribute(User.FEATURES);
}};
ResultSet<User> results = parser.retrieve(users, "SELECT * FROM users WHERE " +
"(height <= 180.0 " +
"AND role IN ('SUPER', 'NORMAL')) " +
"ORDER BY height ASC, userId DESC");
results.forEach(System.out::println);
2.3 持久化
对象集合持久化
CQEngine 的
IndexedCollection
可通过配置到堆内、堆外、磁盘堆内
// 默认堆内 IndexedCollection<User> users = new ConcurrentIndexedCollection<>();
堆外
// 堆外 IndexedCollection<User> users = new ConcurrentIndexedCollection<>(OffHeapPersistence.onPrimaryKey((SimpleAttribute<User, Integer>)User.USER_ID));
磁盘
IndexedCollection<User> users1 = new ConcurrentIndexedCollection<>( DiskPersistence.onPrimaryKeyInFile((SimpleAttribute<User, Integer>) User.USER_ID, new File("cars.dat")) );
混合
IndexedCollection<User> users = new ConcurrentIndexedCollection<>( CompositePersistence.of( OnHeapPersistence.onPrimaryKey((SimpleAttribute<User, Integer>) User.USER_ID), DiskPersistence.onPrimaryKeyInFile((SimpleAttribute<User, Integer>) User.USER_ID, new File("cars.dat")) ) );
索引持久化
大部分索引继承了
OnHeapTypeIndex
,属于堆内索引堆内
users.addIndex(NavigableIndex.onAttribute(User.USER_ID));
堆外
users.addIndex(OffHeapIndex.onAttribute(User.NAME));
磁盘
users.addIndex(DiskIndex.onAttribute(User.FEATURES));
注:如果持久化在磁盘,那么默认就是 sqlite 文件格式
2.4 统计
CQEngine 支持统计 IndexedCollection 的一些数据
- 频率分布(在索引中存储的每个属性值的计数)
- 不同的键(索引中的不同属性值,可选在x和y之间的范围内)
- 存储在索引中的属性值和关联对象的流(升序/降序,可选在x和y之间的范围内)
- 不同键的计数(索引中有多少个不同的属性值)
- 计算特定键(与特定属性值匹配的对象数)
MetadataEngine<User> engine = users.getMetadataEngine();
AttributeMetadata<String, User> attributeMetadata = engine.getAttributeMetadata(User.NAME);
System.out.println(attributeMetadata.getFrequencyDistribution().collect(Collectors.toList()));
System.out.println(attributeMetadata.getCountOfDistinctKeys());
三、性能对比
官方性能测试,见:https://github.com/npgall/cqengine/blob/master/documentation/Benchmark.md
3.1 HashIndex
使用datafaker 库构造假数据进行测试,分别对普通for迭代,stream迭代,cqengine加哈希索引,cqengine不加索引四种情况进行测试
@Warmup(iterations = 2, time = 5)
@Measurement(iterations = 5, time = 5)
@Fork(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@BenchmarkMode(value = Mode.Throughput)
@Timeout(time = 60)
public class JmhDemo {
private List<User> users = new ArrayList<>();
private String find = "";
private IndexedCollection<User> indexedCollection;
private IndexedCollection<User> noIndexedCollection;
Query<User> query;
@Param({"100", "1000", "10000"})
int originLen;
@Setup
public void setUp() {
Faker faker = new Faker(Locale.CHINA);
List<String> n = faker.collection(() ->faker.name().name()).len(originLen / 10).generate();
users = faker.collection(() -> User.builder().name(n.get(faker.random().nextInt(0, originLen / 10 - 1)))
.height(faker.random().nextDouble(1.50, 1.90))
.features(faker.collection(() -> faker.food().fruit()).len(1, 5).generate())
.build())
.len(originLen).generate();
find = n.get(faker.random().nextInt(0, n.size() - 1));
query = equal(User.NAME, find);
indexedCollection = new ConcurrentIndexedCollection<>();
indexedCollection.addAll(users);
indexedCollection.addIndex(HashIndex.onAttribute(User.NAME));
noIndexedCollection = new ConcurrentIndexedCollection<>();
noIndexedCollection.addAll(users);
}
@Benchmark
public void iterationNaiveQuery(Blackhole bh) {
int count = 0;
for (User user : users) {
if (user.getName().equals(find)) count++;
}
bh.consume(count);
}
@Benchmark
public void streamQuery(Blackhole bh) {
long count = users.stream().filter(u -> u.getName().equals(find)).count();
bh.consume(count);
}
@Benchmark
public void cqengineHashIndexQuery(Blackhole bh) {
ResultSet<User> retrieve = indexedCollection.retrieve(query);
bh.consume(retrieve.size());
}
@Benchmark
public void cqengineNoIndexQuery(Blackhole bh) {
ResultSet<User> retrieve = noIndexedCollection.retrieve(query);
bh.consume(retrieve.size());
}
}
得到结果如下图所示,可以发现
- cqengine 加了索引查询,吞吐量随着数据量增大没有太大变化,而普通迭代和流的吞吐量都会随着数据量增加而正比减小
- cqengine 没加索引是最慢的,比普通迭代还慢
- 普通迭代和流两者吞吐量相差不大
3.2 NavigableIndex
@Warmup(iterations = 2, time = 5)
@Measurement(iterations = 5, time = 5)
@Fork(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@BenchmarkMode(value = Mode.Throughput)
@Timeout(time = 60)
public class JmhBetweenDemo {
private List<User> users = new ArrayList<>();
private double min = 1.60;
private double max = 1.70;
private IndexedCollection<User> indexedCollection;
private IndexedCollection<User> noIndexedCollection;
Query<User> query;
@Param({"100", "1000", "10000"})
int originLen;
@Setup
public void setUp() {
Faker faker = new Faker(Locale.CHINA);
users = faker.collection(() -> User.builder().name(faker.name().name())
.height(faker.random().nextInt(150, 200) / 100.0)
.features(faker.collection(() -> faker.food().fruit()).len(1, 5).generate())
.build())
.len(originLen).generate();
query = between(User.HEIGHT, min, max);
indexedCollection = new ConcurrentIndexedCollection<>();
indexedCollection.addAll(users);
indexedCollection.addIndex(NavigableIndex.onAttribute(User.HEIGHT));
noIndexedCollection = new ConcurrentIndexedCollection<>();
noIndexedCollection.addAll(users);
}
@Benchmark
public void iterationNaiveQuery(Blackhole bh) {
int count = 0;
for (User user : users) {
double height = user.getHeight();
if (height >= min && height <= max) count++;
}
bh.consume(count);
}
@Benchmark
public void streamQuery(Blackhole bh) {
long count = users.stream().filter(u -> u.getHeight() >= min && u.getHeight() <= max).count();
bh.consume(count);
}
@Benchmark
public void cqengineWithoutIndexedCollectionQuery(Blackhole bh) {
ResultSet<User> retrieve = noIndexedCollection.retrieve(query);
bh.consume(retrieve.size());
}
@Benchmark
public void cqengineNavigableIndexedCollectionQuery(Blackhole bh) {
ResultSet<User> retrieve = indexedCollection.retrieve(query);
bh.consume(retrieve.size());
}
}
得到结果如下图所示,可以发现
- 当数据量小的时候,普通迭代和流查找都很快,但当数据量增大时,普通迭代和流的吞吐量快速下降
- cqengine 加了索引查询,虽然在数据量小的时候比普通迭代慢,但它的吞吐量随着数据量增大而保持不变
- cqengine 没加索引是最慢的,比普通迭代还慢
- 普通迭代和流两者吞吐量相差不大