# flume-smart **Repository Path**: shaowewang/flume-smart ## Basic Information - **Project Name**: flume-smart - **Description**: flume jdbc source and sink - **Primary Language**: Java - **License**: AGPL-3.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2023-08-07 - **Last Updated**: 2023-08-07 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # FLUME-SMART 自定义JDBC Source和与之配套的JDBC Sink。 用于数据库之间同步数据。 # 系统要求 - Language: Java 8(jdk版本建议1.8.202) - Environment: Windows # 特性 - 1、支持数据库:SqlSever、MySQL、Oralce; - 2、一个JDBCSource 同步多个张表的数据,每个张表的轮询询间隔允由配置文件定义。 - 3、支持Source和Sink直接的表名称、字段名称映射 # Windows 开始使用: ##### 打包该项目 下载本项目 `git clone https://gitee.com/shaowewang/flume-smart ` 执行打包命令 `mvn clean package` ##### 下载Appache Flume 在Apache Flume官方下载 [apache-flume-1.11.0-bin.tar.gz](https://www.apache.org/dyn/closer.lua/flume/1.11.0/apache-flume-1.11.0-bin.tar.gz) 解压 apache-flume-1.11.0-bin.tar.gz 把flume放在合适的路径中(比如D:\apache-flume-1.11.0) ##### 添加到Flume 拷贝下面的jar包到flume安装路径的lib文件夹 1. flume-smart-1.0.0.jar 2. HikariCP-4.0.3.jar 3. hutool-all-4.5.15.jar 4. mysql-connector-j-8.0.31.jar 5. sqljdbc4-4.0.jar 如果想要支持其他数据库记得下载对应的jdbc驱动jar包 ##### 编辑配置文件 下面是一份配置文件的参考: ``` agent.sources = SqlSource agent.sinks = MySqlSink agent.channels = MemChannel agent.sources.SqlSource.type = cn.buhler.source.RichJDBCSource agent.sources.SqlSource.driverClass = com.microsoft.sqlserver.jdbc.SQLServerDriver agent.sources.SqlSource.url = jdbc:sqlserver://localhost:1433;DatabaseName=your_database agent.sources.SqlSource.username = sa agent.sources.SqlSource.password = your_password ## 暂未实现 agent.sources.SqlSource.connectionTimeout = 30000 agent.sources.SqlSource.maxPoolSize = 30000 ## 暂未实现 agent.sources.SqlSource.retryInterval = 60000 ## 暂未实现 agent.sources.SqlSource.maxRetryCount = 5 agent.sources.SqlSource.tableMapping = * agent.sources.SqlSource.table.Recipe.columnMapping = OID>id,Ident>code,Name>name,Version>version,Category>category,Job>job_id,ModificationDate>ModificationDate agent.sources.SqlSource.table.Recipe.query = SELECT OID,Ident,Name,Version,Category,Job,ModificationDate FROM Recipe WHERE ModificationDate >= DATEADD(MINUTE, -1, GETDATE()) agent.sources.SqlSource.table.Recipe.poll.interval = 20 agent.sources.SqlSource.table.JobLog.columnMapping = * agent.sources.SqlSource.table.JobLog.query = SELECT * FROM Joblog WHERE LogDate >=DATEADD(MINUTE, -5, GETDATE()) agent.sources.SqlSource.table.JobLog.poll.interval = 20 agent.sinks.MySqlSink.type = cn.buhler.sink.RichJDBCSink agent.sinks.MySqlSink.driverClass = com.mysql.cj.jdbc.Driver agent.sinks.MySqlSink.url = jdbc:mysql://127.0.0.1:3306/your_database?characterEncoding=utf8&useSSL=false agent.sinks.MySqlSink.username = root agent.sinks.MySqlSink.password = your_password agent.channels.MemChannel.type = memory agent.channels.MemChannel.capacity = 10000 agent.channels.MemChannel.transactionCapacity = 1000 agent.sources.SqlSource.channels = MemChannel agent.sinks.MySqlSink.channel = MemChannel ``` 把上面的信息拷贝在flume-demo.conf文件中,flume-demo.conf文件建议放在flume安装路径的conf文件夹 ##### 启动Flume flume安装路径打开powershell 执行下面的命令启动flume ``` .\bin\flume-ng.cmd agent -n agent -f conf\flume-demo.conf ``` 注意:网络上的文档大多数不适合windows # 配置文件说明 ### JDBC Source 配置 ``` agent.sources.SqlSource.type #描述:定义 JDBC Source 的类型。 #示例值:cn.buhler.source.RichJDBCSource agent.sources.SqlSource.driverClass #描述:定义 JDBC 驱动的类名。 #示例值:com.microsoft.sqlserver.jdbc.SQLServerDriver agent.sources.SqlSource.url #描述:定义数据库的连接 URL。 #示例值:jdbc:sqlserver://localhost:1433;DatabaseName=your_database agent.sources.SqlSource.username #描述:定义数据库的用户名。 #示例值:sa agent.sources.SqlSource.password #描述:定义数据库的密码。 #示例值:your_password agent.sources.SqlSource.tableMapping #描述:定义表映射规则。 #示例值:* agent.sources.SqlSource.table.Recipe.columnMapping #描述:定义特定表的列映射规则。 #示例值:OID=>id,Ident=>code,Name=>name,Version=>version,Category=>category,Job=>job_id,ModificationDate=>ModificationDate agent.sources.SqlSource.table.Recipe.query #描述:定义特定表的查询语句。 #示例值:SELECT OID,Ident,Name,Version,Category,Job,ModificationDate FROM Recipe WHERE ModificationDate >= DATEADD(MINUTE, -1, GETDATE()) agent.sources.SqlSource.table.Recipe.poll.interval #描述:定义特定表的轮询间隔(单位:秒)。 #示例值:20 ``` ### JDBC Sink 配置 ``` agent.sinks.MySqlSink.type #描述:定义 JDBC Sink 的类型。 #示例值:cn.buhler.sink.RichJDBCSink agent.sinks.MySqlSink.driverClass #描述:定义 JDBC 驱动的类名。 #示例值:com.mysql.cj.jdbc.Driver agent.sinks.MySqlSink.url #描述:定义数据库的连接 URL。 #示例值:jdbc:mysql://127.0.0.1:3306/your_database?characterEncoding=utf8&useSSL=false agent.sinks.MySqlSink.username #描述:定义数据库的用户名。 #示例值:root agent.sinks.MySqlSink.password #描述:定义数据库的密码。 #示例值:your_password ``` ### Channel 配置 ``` agent.channels.MemChannel.type #描述:定义 Channel 的类型。 #示例值:memory agent.channels.MemChannel.capacity #描述:定义 Channel 的容量。 #示例值:10000 agent.channels.MemChannel.transactionCapacity #描述:定义 Channel 的事务容量。 #示例值:1000 ``` ### 表映射和字段映射 #### 表映射 ``` agent.sources.SqlSource.tableMapping #描述:定义源数据库和目标数据库之间的表映射关系。 #格式:源表1>目标表1,源表2>目标表2,... #示例: #当源表和目标表名称完全相同: agent.sources.SqlSource.tableMapping=* #当部分源表和目标表名称相同: agent.sources.SqlSource.tableMapping=t1,t2>t4 #当所有源表和目标表名称都不同: agent.sources.SqlSource.tableMapping=t1>t3,t2>t4 ``` #### 字段映射 ``` agent.sources.SqlSource.table.[TableName].columnMapping #描述:定义特定表中的字段映射关系。 #格式:源字段1>目标字段1,源字段2>目标字段2,... #示例: #当源字段和目标字段名称完全相同: agent.sources.SqlSource.table.[TableName].columnMapping=* #当部分源字段和目标字段名称相同: agent.sources.SqlSource.table.[TableName].columnMapping=field1,field2>targetField2 #当所有源字段和目标字段名称都不同: agent.sources.SqlSource.table.[TableName].columnMapping=field1>targetField1,field2>targetField2 ``` # 更新内容 ## 1.0.1 改进在数据库连接失败时进行适当的重试,并在每次失败后都正确地释放资源。此外连接池的连接超时和最大重试次数都是可配置的 ## 1.0.0 第一个版本