引言
kafka-connect-jdbc
是一个 Kafka Connector,用于在任何兼容 JDBC 的数据库和 Kafka 之间加载数据。它支持从数据库中读取数据并将其写入 Kafka 主题,也可以将 Kafka 主题中的数据写入数据库。
核心类与组件
1. JdbcSourceConnector
- 功能:这是 Kafka Connect 的源连接器实现类,负责监控 JDBC 数据库并生成任务以摄取数据库内容。
- 关键代码分析:
public class JdbcSourceConnector extends SourceConnector {
// ... 省略部分代码
@Override
public void start(Map<String, String> properties) throws ConnectException {
log.info("Starting JDBC Source Connector");
try {
configProperties = properties;
config = new JdbcSourceConnectorConfig(configProperties);
} catch (ConfigException e) {
throw new ConnectException("Couldn't start JdbcSourceConnector due to configuration error", e);
}
// 获取数据库连接
final String dbUrl = config.getString(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG);
final int maxConnectionAttempts = config.getInt(JdbcSourceConnectorConfig.CONNECTION_ATTEMPTS_CONFIG);
final long connectionRetryBackoff = config.getLong(JdbcSourceConnectorConfig.CONNECTION_BACKOFF_CONFIG);
dialect = DatabaseDialects.findBestFor(dbUrl, config);
cachedConnectionProvider = connectionProvider(maxConnectionAttempts, connectionRetryBackoff);
cachedConnectionProvider.getConnection();
// 启动表监控线程
long tablePollMs = config.getLong(JdbcSourceConnectorConfig.TABLE_POLL_INTERVAL_MS_CONFIG);
long tableStartupLimitMs = config.getLong(JdbcSourceConnectorConfig.TABLE_MONITORING_STARTUP_POLLING_LIMIT_MS_CONFIG);
List<String> whitelist = config.getList(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG);
Set<String> whitelistSet = whitelist.isEmpty() ? null : new HashSet<>(whitelist);
List<String> blacklist = config.getList(JdbcSourceConnectorConfig.TABLE_BLACKLIST_CONFIG);
Set<String> blacklistSet = blacklist.isEmpty() ? null : new HashSet<>(blacklist);
tableMonitorThread = new TableMonitorThread(
dialect,
cachedConnectionProvider,
context,
tableStartupLimitMs,
tablePollMs,
whitelistSet,
blacklistSet,
Time.SYSTEM
);
if (config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG).isEmpty()) {
tableMonitorThread.start();
log.info("Starting Table Monitor Thread");
}
}
@Override
public Class<? extends Task> taskClass() {
return JdbcSourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG);
List<Map<String, String>> taskConfigs;
if (!query.isEmpty()) {
// 自定义查询
Map<String, String> taskProps = new HashMap<>(configProperties);
taskProps.put(JdbcSourceTaskConfig.TABLES_CONFIG, "");
taskProps.put(JdbcSourceTaskConfig.TABLES_FETCHED, "true");
taskConfigs = Collections.singletonList(taskProps);
} else {
// 表模式
List<TableId> currentTables = tableMonitorThread.tables();
if (currentTables == null || currentTables.isEmpty()) {
// 没有找到表
taskConfigs = new ArrayList<>(1);
Map<String, String> taskProps = new HashMap<>(configProperties);
taskProps.put(JdbcSourceTaskConfig.TABLES_CONFIG, "");
taskProps.put(JdbcSourceTaskConfig.TABLES_FETCHED, currentTables == null ? "false" : "true");
taskConfigs.add(taskProps);
} else {
// 分配表到任务
int numGroups = Math.min(currentTables.size(), maxTasks);
List<List<TableId>> tablesGrouped = ConnectorUtils.groupPartitions(currentTables, numGroups);
taskConfigs = new ArrayList<>(tablesGrouped.size());
for (List<TableId> taskTables : tablesGrouped) {
Map<String, String> taskProps = new HashMap<>(configProperties);
ExpressionBuilder builder = dialect.expressionBuilder();
builder.appendList().delimitedBy(",").of(taskTables);
taskProps.put(JdbcSourceTaskConfig.TABLES_CONFIG, builder.toString());
taskProps.put(JdbcSourceTaskConfig.TABLES_FETCHED, "true");
taskConfigs.add(taskProps);
}
}
}
return taskConfigs;
}
@Override
public void stop() throws ConnectException {
log.info("Stopping table monitoring thread");
tableMonitorThread.shutdown();
try {
tableMonitorThread.join(MAX_TIMEOUT);
} catch (InterruptedException e) {
// Ignore, shouldn't be interrupted
} finally {
cachedConnectionProvider.close(true);
if (dialect != null) {
dialect.close();
}
}
}
}
- 详细解释:
start
方法:初始化连接器配置,获取数据库连接,启动表监控线程(如果没有自定义查询)。taskClass
方法:返回任务类JdbcSourceTask
。taskConfigs
方法:根据配置生成任务配置。如果有自定义查询,生成一个任务配置;否则,根据表的数量和最大任务数分配表到任务。stop
方法:停止表监控线程,关闭数据库连接和方言。
2. JdbcSourceTask
- 功能:Kafka Connect 的源任务实现类,负责从 JDBC 数据库中读取数据并生成 Kafka Connect 记录。
- 关键代码分析:
public class JdbcSourceTask extends SourceTask {
// ... 省略部分代码
@Override
public void start(Map<String, String> properties) {
log.info("Starting JDBC source task");
try {
config = new JdbcSourceTaskConfig(properties);
} catch (ConfigException e) {
throw new ConfigException("Couldn't start JdbcSourceTask due to configuration error", e);
}
List<String> tables = config.getList(JdbcSourceTaskConfig.TABLES_CONFIG);
Boolean tablesFetched = config.getBoolean(JdbcSourceTaskConfig.TABLES_FETCHED);
String query = config.getString(JdbcSourceTaskConfig.QUERY_CONFIG);
if ((tables.isEmpty() && query.isEmpty())) {
if (!tablesFetched) {
// 等待表信息获取
taskThreadId.set(Thread.currentThread().getId());
log.info("Started JDBC source task. Waiting for DB tables to be fetched.");
return;
}
// 没有分配表或查询
throw new ConfigException("Task is being killed because it was not assigned a table nor a query to execute.");
}
if ((!tables.isEmpty() && !query.isEmpty())) {
// 表和查询不能同时分配
throw new ConfigException("Invalid configuration: a JdbcSourceTask cannot have both a table and a query assigned to it");
}
// 获取数据库连接
final String url = config.getString(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG);
final int maxConnAttempts = config.getInt(JdbcSourceConnectorConfig.CONNECTION_ATTEMPTS_CONFIG);
final long retryBackoff = config.getLong(JdbcSourceConnectorConfig.CONNECTION_BACKOFF_CONFIG);
final String dialectName = config.getString(JdbcSourceConnectorConfig.DIALECT_NAME_CONFIG);
if (dialectName != null && !dialectName.trim().isEmpty()) {
dialect = DatabaseDialects.create(dialectName, config);
} else {
dialect = DatabaseDialects.findBestFor(url, config);
}
cachedConnectionProvider = connectionProvider(maxConnAttempts, retryBackoff);
// 设置事务隔离级别
dialect.setConnectionIsolationMode(
cachedConnectionProvider.getConnection(),
TransactionIsolationMode.valueOf(config.getString(JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG))
);
// 确定查询模式
TableQuerier.QueryMode queryMode = !query.isEmpty() ? TableQuerier.QueryMode.QUERY : TableQuerier.QueryMode.TABLE;
List<String> tablesOrQuery = queryMode == TableQuerier.QueryMode.QUERY ? Collections.singletonList(query) : tables;
// 获取偏移量
String mode = config.getString(JdbcSourceTaskConfig.MODE_CONFIG);
Map<String, List<Map<String, String>>> partitionsByTableFqn = new HashMap<>();
Map<Map<String, String>, Map<String, Object>> offsets = null;
if (mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING)
|| mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP)
|| mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING)) {
List<Map<String, String>> partitions = new ArrayList<>(tables.size());
switch (queryMode) {
case TABLE:
for (String table : tables) {
List<Map<String, String>> tablePartitions = possibleTablePartitions(table);
partitions.addAll(tablePartitions);
partitionsByTableFqn.put(table, tablePartitions);
}
break;
case QUERY:
partitions.add(Collections.singletonMap(JdbcSourceConnectorConstants.QUERY_NAME_KEY, JdbcSourceConnectorConstants.QUERY_NAME_VALUE));
break;
default:
throw new ConfigException("Unknown query mode: " + queryMode);
}
offsets = context.offsetStorageReader().offsets(partitions);
}
// 创建表查询器
for (String tableOrQuery : tablesOrQuery) {
final List<Map<String, String>> tablePartitionsToCheck;
final Map<String, String> partition;
switch (queryMode) {
case TABLE:
tablePartitionsToCheck = partitionsByTableFqn.get(tableOrQuery);
break;
case QUERY:
partition = Collections.singletonMap(JdbcSourceConnectorConstants.QUERY_NAME_KEY, JdbcSourceConnectorConstants.QUERY_NAME_VALUE);
tablePartitionsToCheck = Collections.singletonList(partition);
break;
default:
throw new ConfigException("Unexpected query mode: " + queryMode);
}
Map<String, Object> offset = null;
if (offsets != null) {
for (Map<String, String> toCheckPartition : tablePartitionsToCheck) {
offset = offsets.get(toCheckPartition);
if (offset != null) {
break;
}
}
}
TableQuerier querier = new TableQuerier(
queryMode,
tableOrQuery,
config.getString(JdbcSourceTaskConfig.MODE_CONFIG),
config.getString(JdbcSourceTaskConfig.INCREMENTING_COLUMN_NAME_CONFIG),
config.getList(JdbcSourceTaskConfig.TIMESTAMP_COLUMN_NAME_CONFIG),
config.getLong(JdbcSourceTaskConfig.TIMESTAMP_DELAY_INTERVAL_MS_CONFIG),
config.getBoolean(JdbcSourceTaskConfig.VALIDATE_NON_NULL_CONFIG),
config.timeZone(),
config.getString(JdbcSourceTaskConfig.QUERY_SUFFIX_CONFIG).trim(),
offset
);
tableQueue.add(querier);
}
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
int consecutiveEmptyResults = 0;
while (running.get() && records.isEmpty() && consecutiveEmptyResults < CONSECUTIVE_EMPTY_RESULTS_BEFORE_RETURN) {
TableQuerier querier = tableQueue.poll();
if (querier == null) {
consecutiveEmptyResults++;
time.sleep(100);
continue;
}
try {
List<SourceRecord> querierRecords = querier.poll(cachedConnectionProvider.getConnection(), time);
records.addAll(querierRecords);
if (querierRecords.isEmpty()) {
consecutiveEmptyResults++;
} else {
consecutiveEmptyResults = 0;
}
} catch (SQLException e) {
log.error("Error polling for table {}: {}", querier.getTableOrQuery(), e.getMessage());
maxRetriesPerQuerier--;
if (maxRetriesPerQuerier > 0) {
time.sleep(1000);
tableQueue.add(querier);
} else {
log.error("Max retries exceeded for table {}", querier.getTableOrQuery());
}
} finally {
if (running.get()) {
tableQueue.add(querier);
}
}
}
return records;
}
@Override
public void stop() {
running.set(false);
cachedConnectionProvider.close(true);
if (dialect != null) {
dialect.close();
}
}
}
- 详细解释:
start
方法:初始化任务配置,检查配置的有效性,获取数据库连接,设置事务隔离级别,确定查询模式,获取偏移量,创建表查询器。poll
方法:从表查询器队列中取出查询器,执行查询,将查询结果转换为SourceRecord
列表返回。如果查询失败,进行重试。stop
方法:停止任务,关闭数据库连接和方言。
3. JdbcSourceConnectorConfig
- 功能:配置类,用于管理
JdbcSourceConnector
的配置信息。 - 关键代码分析:
public class JdbcSourceConnectorConfig extends AbstractConfig {
// ... 省略部分代码
public static final String CONNECTION_URL_CONFIG = CONNECTION_PREFIX + "url";
public static final String CONNECTION_USER_CONFIG = CONNECTION_PREFIX + "user";
public static final String CONNECTION_PASSWORD_CONFIG = CONNECTION_PREFIX + "password";
public static final String CONNECTION_ATTEMPTS_CONFIG = CONNECTION_PREFIX + "attempts";
public static final String CONNECTION_BACKOFF_CONFIG = CONNECTION_PREFIX + "backoff.ms";
public static final String POLL_INTERVAL_MS_CONFIG = "poll.interval.ms";
public static final String BATCH_MAX_ROWS_CONFIG = "batch.max.rows";
public static final String NUMERIC_PRECISION_MAPPING_CONFIG = "numeric.precision.mapping";
public static final String NUMERIC_MAPPING_CONFIG = "numeric.mapping";
public static final String DIALECT_NAME_CONFIG = "dialect.name";
public static final String MODE_CONFIG = "mode";
public static final String INCREMENTING_COLUMN_NAME_CONFIG = "incrementing.column.name";
public static final String TIMESTAMP_COLUMN_NAME_CONFIG = "timestamp.column.name";
public static final String TIMESTAMP_DELAY_INTERVAL_MS_CONFIG = "timestamp.delay.interval.ms";
public static final String VALIDATE_NON_NULL_CONFIG = "validate.non.null";
public static final String QUERY_CONFIG = "query";
public static final String TABLE_WHITELIST_CONFIG = "table.whitelist";
public static final String TABLE_BLACKLIST_CONFIG = "table.blacklist";
public static final String TABLE_POLL_INTERVAL_MS_CONFIG = "table.poll.interval.ms";
public static final String TABLE_MONITORING_STARTUP_POLLING_LIMIT_MS_CONFIG = "table.monitoring.startup.polling.limit.ms";
public static final String TOPIC_PREFIX_CONFIG = "topic.prefix";
public static final String TRANSACTION_ISOLATION_MODE_CONFIG = "transaction.isolation.mode";
public JdbcSourceConnectorConfig(Map<?, ?> originals) {
super(configDef(), originals);
}
public static ConfigDef configDef() {
ConfigDef configDef = new ConfigDef();
configDef.define(
CONNECTION_URL_CONFIG,
Type.STRING,
CONNECTION_URL_DEFAULT,
Importance.HIGH,
CONNECTION_URL_DOC,
null,
-1,
Width.LONG,
CONNECTION_URL_DISPLAY
);
configDef.define(
CONNECTION_USER_CONFIG,
Type.STRING,
null,
Importance.MEDIUM,
CONNECTION_USER_DOC,
null,
-1,
Width.MEDIUM,
CONNECTION_USER_DISPLAY
);
// ... 省略其他配置项的定义
return configDef;
}
}
- 详细解释:
- 定义了一系列配置项,如数据库连接 URL、用户名、密码、轮询间隔、批量最大行数等。
configDef
方法用于定义配置项的元信息,包括类型、默认值、重要性、文档等。
工作流程
- 启动连接器:
JdbcSourceConnector
的start
方法被调用,初始化配置,获取数据库连接,启动表监控线程。 - 生成任务配置:
JdbcSourceConnector
的taskConfigs
方法根据配置生成任务配置,将表分配到不同的任务中。 - 启动任务:
JdbcSourceTask
的start
方法被调用,初始化任务配置,获取数据库连接,设置事务隔离级别,创建表查询器。 - 轮询数据:
JdbcSourceTask
的poll
方法被周期性调用,从表查询器队列中取出查询器,执行查询,将查询结果转换为SourceRecord
列表返回。 - 停止任务和连接器:
JdbcSourceTask
的stop
方法和JdbcSourceConnector
的stop
方法被调用,关闭数据库连接和方言。
总结
kafka-connect-jdbc
通过 JdbcSourceConnector
和 JdbcSourceTask
实现了从 JDBC 数据库到 Kafka 的数据摄取。JdbcSourceConnector
负责管理连接器的生命周期和任务分配,JdbcSourceTask
负责实际的数据读取和转换。JdbcSourceConnectorConfig
用于管理连接器的配置信息。通过深入理解这些核心类和组件的工作原理,可以更好地使用和扩展 kafka-connect-jdbc
。