1. gradle依赖新增
implementation 'org.springframework.boot:spring-boot-starter-data-elasticsearch'
implementation 'co.elastic.clients:elasticsearch-java:8.15.0'
2. application.yml配置
spring:
elasticsearch:
uris: http://localhost:9200
username: elastic
password: elasticsearch
3. 配置相关bean
@Configuration
public class ElasticsearchConfig {
@Value("${spring.elasticsearch.uris}")
private String elasticsearchUrl;
@Bean
public RestClient restClient() {
SSLContext sslContext;
try {
sslContext = SSLContexts.custom()
.loadTrustMaterial(null, (chain, authType) -> true)
.build();
} catch (Exception e) {
throw new RuntimeException("Failed to create SSL context", e);
}
return RestClient.builder(HttpHost.create(elasticsearchUrl))
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
.setSSLContext(sslContext)
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE))
.build();
}
@Bean
public ElasticsearchTransport elasticsearchTransport(RestClient restClient) {
return new RestClientTransport(restClient, new JacksonJsonpMapper());
}
@Bean
public ElasticsearchClient elasticsearchClient(ElasticsearchTransport transport) {
return new ElasticsearchClient(transport);
}
}
4. 查询示例
@Service
@RequiredArgsConstructor
@Slf4j
public class DogeServiceImpl implements DogeService {
private static final Map<String, Long> balanceHistory = new HashMap<>();
private static final long THRESHOLD = 1_000_000;
private final ElasticsearchClient elasticsearchClient;
private static final int TOP_ADDRESS_LIMIT = 100;
private static final int HISTORY_QUERY_LIMIT = 10000;
@Override
public List<DogeDto> getTop100DogeBalanceHistory(DogeQueryDto dogeQueryDto) {
try {
List<Doge> topAddresses = getCurrentTop100Addresses();
return getHistoryForAddresses(topAddresses, dogeQueryDto);
} catch (IOException e) {
throw new RuntimeException("查询Doge历史余额失败", e);
}
}
@Override
@Scheduled(cron = "0 33 18 * * ?")
public void checkDogeBalance() {
List<DogeDto> doges = getTop100DogeBalanceHistory(new DogeQueryDto());
if (doges == null || doges.isEmpty()) {
System.out.println("未获取到 Dogecoin 数据");
return;
}
for (DogeDto dto : doges) {
String address = dto.getAddress();
long currentBalance = dto.getHistory().get(0).getBalance();
String wallet = dto.getWallet();
Long previousBalance = balanceHistory.getOrDefault(address, null);
if (previousBalance == null) {
} else {
long difference = currentBalance - previousBalance;
if (Math.abs(difference) >= THRESHOLD) {
String action = difference > 0 ? "增加" : "减少";
String message = "[" + java.time.LocalDate.now() + "] 警告: 地址 " + address +
" 的余额" + action + "了 " + Math.abs(difference) +
" Dogecoin (之前: " + previousBalance + ", 现在: " + currentBalance + ")";
if (StringUtils.isNotBlank(wallet)) {
message += " [重点监控钱包: " + wallet + "]";
}
log.info(message);
} else {
}
}
balanceHistory.put(address, currentBalance);
if (StringUtils.isNotBlank(wallet)) {
List<DogeDto.History> histories = dto.getHistory();
if (histories != null && !histories.isEmpty()) {
long lastHistoryBalance = histories.get(0).getBalance();
for (int i = 1; i < histories.size(); i++) {
long olderBalance = histories.get(i).getBalance();
long historyDifference = lastHistoryBalance - olderBalance;
if (Math.abs(historyDifference) >= THRESHOLD) {
String action = historyDifference > 0 ? "减少" : "增加";
log.info("[" + histories.get(i).getDate() + "] 历史警告: 地址 " + address +
" 的余额" + action + "了 " + Math.abs(historyDifference) +
" Dogecoin [重点监控钱包: " + wallet + "]");
}
lastHistoryBalance = olderBalance;
}
}
}
}
}
private List<Doge> getCurrentTop100Addresses() {
SearchResponse<Doge> response = null;
try {
response = elasticsearchClient.search(s -> s
.index("doge")
.size(TOP_ADDRESS_LIMIT)
.sort(so -> so.field(f -> f.field("rank").order(SortOrder.Asc)))
.collapse(c -> c.field("address"))
.source(sc -> sc.filter(f -> f.includes("address", "wallet", "rank")))
, Doge.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
return response.hits().hits().stream()
.map(Hit::source)
.filter(Objects::nonNull)
.sorted(Comparator.comparing(Doge::getRank))
.collect(Collectors.toList());
}
private List<DogeDto> getHistoryForAddresses(List<Doge> topAddresses, DogeQueryDto queryDto) throws IOException {
List<String> addresses = topAddresses.stream()
.map(Doge::getAddress)
.collect(Collectors.toList());
Query baseQuery = Query.of(q -> q
.bool(b -> b
.must(m -> m.terms(t -> t
.field("address")
.terms(tv -> tv.value(addresses.stream()
.map(FieldValue::of)
.collect(Collectors.toList())))
))
)
);
SearchResponse<Doge> response = elasticsearchClient.search(s -> s
.index("doge")
.size(HISTORY_QUERY_LIMIT)
.query(baseQuery)
.sort(so -> so.field(f -> f.field("c_date").order(SortOrder.Asc)))
, Doge.class);
Map<String, List<Doge>> groupedByAddress = response.hits().hits().stream()
.map(Hit::source)
.filter(Objects::nonNull)
.collect(Collectors.groupingBy(Doge::getAddress));
return topAddresses.stream()
.map(addressDoc -> {
List<Doge> historyDocs = groupedByAddress.getOrDefault(addressDoc.getAddress(), Collections.emptyList());
return convertToDogeDto(addressDoc, historyDocs);
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private DogeDto convertToDogeDto(Doge addressDoc, List<Doge> historyDocs) {
if (historyDocs.isEmpty()) {
return null;
}
DogeDto dto = new DogeDto();
dto.setAddress(addressDoc.getAddress());
dto.setWallet(addressDoc.getWallet());
dto.setHistory(historyDocs.stream()
.sorted(Comparator.comparing(Doge::getC_date))
.map(this::convertToHistoryDto)
.collect(Collectors.toList()));
return dto;
}
private DogeDto.History convertToHistoryDto(Doge doc) {
DogeDto.History history = new DogeDto.History();
history.setDate(doc.getC_date());
history.setBalance(doc.getBalance());
history.setRank(doc.getRank());
history.setPercent(doc.getPercent());
return history;
}
}