一、前言
基于beangle-db-replication源码包(2.2.1)中的数据库db同步复制replication实现原理源码分析,主要定义org.beangle.db.replication.Replicator同步接口,然后分别有关于索引IndexReplicator实现、序列SequenceReplicator实现、数据DataReplicator实现、混合类型CompositeReplicator实现、公共约束ConstraintReplicator实现等,从而对常用的数据库同步replicator基于设计实现有可见深刻的认识,具体参见下方源码示例。
二、源码说明
1.Replicator接口、DataWrapper数据接口
package org.beangle.db.replication;@b@@b@public abstract interface Replicator@b@{@b@ public abstract void setTarget(DataWrapper paramDataWrapper);@b@@b@ public abstract void setSource(DataWrapper paramDataWrapper);@b@@b@ public abstract void reset();@b@@b@ public abstract void start();@b@}
package org.beangle.db.replication;@b@@b@import java.util.List;@b@import org.beangle.db.jdbc.meta.Table;@b@@b@public abstract interface DataWrapper@b@{@b@ public abstract List<Object> getData(String paramString);@b@@b@ public abstract List<Object> getData(Table paramTable);@b@@b@ public abstract int pushData(Table paramTable, List<Object> paramList);@b@@b@ public abstract void close();@b@@b@ public abstract int count(Table paramTable);@b@}
2.主要实现类有CompositeReplicator、ConstraintReplicator..SequenceReplicator
package org.beangle.db.replication.impl;@b@@b@import java.util.List;@b@import org.beangle.commons.collection.CollectUtils;@b@import org.beangle.db.replication.DataWrapper;@b@import org.beangle.db.replication.Replicator;@b@@b@public class CompositeReplicator@b@ implements Replicator@b@{@b@ List<Replicator> replicators = CollectUtils.newArrayList();@b@@b@ public CompositeReplicator(Replicator[] replicators)@b@ {@b@ this.replicators = CollectUtils.newArrayList(replicators);@b@ }@b@@b@ public void setTarget(DataWrapper source) {@b@ for (Replicator replicator : this.replicators)@b@ replicator.setTarget(source);@b@ }@b@@b@ public void setSource(DataWrapper source)@b@ {@b@ for (Replicator replicator : this.replicators)@b@ replicator.setSource(source);@b@ }@b@@b@ public void reset()@b@ {@b@ for (Replicator replicator : this.replicators)@b@ replicator.reset();@b@ }@b@@b@ public void start()@b@ {@b@ for (Replicator replicator : this.replicators)@b@ replicator.start();@b@ }@b@}
package org.beangle.db.replication.impl;@b@@b@import java.util.Collection;@b@import java.util.Collections;@b@import java.util.List;@b@import org.apache.commons.lang.time.StopWatch;@b@import org.beangle.commons.collection.CollectUtils;@b@import org.beangle.db.jdbc.meta.Constraint;@b@import org.beangle.db.jdbc.meta.Database;@b@import org.beangle.db.jdbc.meta.ForeignKey;@b@import org.beangle.db.replication.DataWrapper;@b@import org.beangle.db.replication.Replicator;@b@import org.beangle.db.replication.wrappers.DatabaseWrapper;@b@import org.slf4j.Logger;@b@import org.slf4j.LoggerFactory;@b@@b@public class ConstraintReplicator@b@ implements Replicator@b@{@b@ private static final Logger logger = LoggerFactory.getLogger(ConstraintReplicator.class);@b@ private DatabaseWrapper source;@b@ private DatabaseWrapper target;@b@ private List<Constraint> contraints = CollectUtils.newArrayList();@b@@b@ public ConstraintReplicator(DatabaseWrapper source, DatabaseWrapper target)@b@ {@b@ this.source = source;@b@ this.target = target;@b@ }@b@@b@ public void addAll(Collection<? extends Constraint> newContraints) {@b@ this.contraints.addAll(newContraints);@b@ }@b@@b@ public void reset()@b@ {@b@ }@b@@b@ public void start() {@b@ Collections.sort(this.contraints);@b@ StopWatch watch = new StopWatch();@b@ watch.start();@b@ logger.info("Start constraint replication...");@b@ String targetSchema = this.target.getDatabase().getSchema();@b@ for (Constraint contraint : this.contraints)@b@ if (contraint instanceof ForeignKey) {@b@ ForeignKey fk = (ForeignKey)contraint;@b@ String sql = fk.getAlterSql(this.target.getDialect(), targetSchema);@b@ try {@b@ this.target.execute(sql);@b@ logger.info("Apply constaint {}", fk.getName());@b@ } catch (Exception e) {@b@ logger.warn("Cannot execute {}", sql);@b@ }@b@ }@b@@b@ logger.info("End constraint replication,using {}", Long.valueOf(watch.getTime()));@b@ }@b@@b@ public void setSource(DataWrapper source) {@b@ this.source = ((DatabaseWrapper)source);@b@ }@b@@b@ public void setTarget(DataWrapper target) {@b@ this.target = ((DatabaseWrapper)target);@b@ }@b@}
package org.beangle.db.replication.impl;@b@@b@import java.util.Collection;@b@import java.util.Collections;@b@import java.util.List;@b@import org.apache.commons.collections.Buffer;@b@import org.apache.commons.collections.BufferUnderflowException;@b@import org.apache.commons.collections.BufferUtils;@b@import org.apache.commons.collections.buffer.UnboundedFifoBuffer;@b@import org.apache.commons.lang.StringUtils;@b@import org.apache.commons.lang.time.StopWatch;@b@import org.beangle.commons.collection.CollectUtils;@b@import org.beangle.commons.collection.page.PageLimit;@b@import org.beangle.db.jdbc.meta.Database;@b@import org.beangle.db.jdbc.meta.Table;@b@import org.beangle.db.replication.DataWrapper;@b@import org.beangle.db.replication.Replicator;@b@import org.beangle.db.replication.wrappers.DatabaseWrapper;@b@import org.slf4j.Logger;@b@import org.slf4j.LoggerFactory;@b@@b@public class DataReplicator@b@ implements Replicator@b@{@b@ private static final Logger logger = LoggerFactory.getLogger(DataReplicator.class);@b@ List<Table> tables = CollectUtils.newArrayList();@b@ DatabaseWrapper source;@b@ DatabaseWrapper target;@b@@b@ public DataReplicator()@b@ {@b@ }@b@@b@ public DataReplicator(DatabaseWrapper source, DatabaseWrapper target)@b@ {@b@ this.source = source;@b@ this.target = target;@b@ }@b@@b@ protected void addTable(Table table) {@b@ this.tables.add(table);@b@ }@b@@b@ public void addAll(Collection<? extends Table> newTables) {@b@ this.tables.addAll(newTables);@b@ }@b@@b@ public boolean addTables(String[] tables) {@b@ boolean success = true;@b@ for (int i = 0; i < tables.length; ++i) {@b@ String newTable = tables[i];@b@ if ((!(StringUtils.contains(tables[i], '.'))) && (null != this.source.getDatabase().getSchema()))@b@ newTable = Table.qualify(this.source.getDatabase().getSchema(), newTable);@b@@b@ Table tm = this.source.getDatabase().getTable(newTable);@b@ if (null == tm)@b@ logger.error("cannot find metadata for {}", newTable);@b@ else@b@ addTable(tm);@b@@b@ success &= tm != null;@b@ }@b@ return success;@b@ }@b@@b@ public void reset()@b@ {@b@ }@b@@b@ public void setSource(DataWrapper source) {@b@ this.source = ((DatabaseWrapper)source);@b@ }@b@@b@ public void setTarget(DataWrapper target) {@b@ this.target = ((DatabaseWrapper)target);@b@ }@b@@b@ public void start() {@b@ Collections.sort(this.tables);@b@ UnboundedFifoBuffer fifoBuffer = new UnboundedFifoBuffer();@b@ for (Table table : this.tables)@b@ fifoBuffer.add(table);@b@@b@ Buffer tableBuffer = BufferUtils.synchronizedBuffer(fifoBuffer);@b@ StopWatch watch = new StopWatch();@b@ watch.start();@b@ logger.info("Start data replication...");@b@ List tasks = CollectUtils.newArrayList();@b@ for (int i = 0; i < 9; ++i) {@b@ Thread thread = new Thread(new ReplicatorTask(this.source, this.target, tableBuffer));@b@ tasks.add(thread);@b@ thread.start();@b@ }@b@ for (Thread task : tasks)@b@ try {@b@ task.join();@b@ } catch (InterruptedException e) {@b@ e.printStackTrace();@b@ }@b@@b@ logger.info("End data replication,using {}", Long.valueOf(watch.getTime()));@b@ }@b@@b@ public static class ReplicatorTask implements Runnable {@b@ DatabaseWrapper source;@b@ DatabaseWrapper target;@b@ Buffer buffer;@b@@b@ public void run() {@b@ try {@b@ while (!(this.buffer.isEmpty())) {@b@ Table table = (Table)this.buffer.remove();@b@ replicate(table);@b@ }@b@ } catch (BufferUnderflowException e) {@b@ return;@b@ }@b@ }@b@@b@ public ReplicatorTask(DatabaseWrapper source, DatabaseWrapper target, Buffer buffer)@b@ {@b@ this.source = source;@b@ this.target = target;@b@ this.buffer = buffer;@b@ }@b@@b@ private boolean createOrReplaceTable(Table table) {@b@ if (this.target.drop(table)) {@b@ if (this.target.create(table)) {@b@ DataReplicator.access$000().info("Create table {}", table.getName());@b@ return true;@b@ }@b@ DataReplicator.access$000().error("Create table {} failure.", table.getName());@b@ }@b@@b@ return false;@b@ }@b@@b@ public void replicate(Table table) {@b@ String tableName = table.identifier();@b@@b@ table.setSchema(this.target.getDatabase().getSchema());@b@ try {@b@ if (!(createOrReplaceTable(table))) return;@b@ Table srcTable = this.source.getDatabase().getTable(tableName);@b@ int count = this.source.count(srcTable);@b@ if (count == 0) {@b@ this.target.pushData(table, Collections.emptyList());@b@ DataReplicator.access$000().info("Replicate {}(0)", table);@b@ } else {@b@ int curr = 0;@b@ PageLimit limit = new PageLimit(0, 1000);@b@ while (curr < count) {@b@ limit.setPageNo(limit.getPageNo() + 1);@b@ List data = this.source.getData(srcTable, limit);@b@ if (data.isEmpty()) {@b@ DataReplicator.access$000().error("Cannot fetch limit data in {} with page size {}", Integer.valueOf(limit.getPageNo()), Integer.valueOf(limit.getPageSize()));@b@ }@b@@b@ int successed = this.target.pushData(table, data);@b@ curr += data.size();@b@ if (successed == count)@b@ DataReplicator.access$000().info("Replicate {}({})", table, Integer.valueOf(successed));@b@ else if (successed == data.size())@b@ DataReplicator.access$000().info("Replicate {}({}/{})", new Object[] { table, Integer.valueOf(curr), Integer.valueOf(count) });@b@ else@b@ DataReplicator.access$000().warn("Replicate {}({}/{})", new Object[] { table, Integer.valueOf(successed), Integer.valueOf(data.size()) });@b@ }@b@ }@b@ }@b@ catch (Exception e) {@b@ DataReplicator.access$000().error("Replicate error " + table.identifier(), e);@b@ }@b@ }@b@ }@b@}
package org.beangle.db.replication.impl;@b@@b@import org.beangle.db.replication.DataWrapper;@b@import org.beangle.db.replication.Replicator;@b@import org.beangle.db.replication.wrappers.DatabaseWrapper;@b@@b@public class IndexReplicator@b@ implements Replicator@b@{@b@ DatabaseWrapper source;@b@ DatabaseWrapper target;@b@@b@ public void reset()@b@ {@b@ }@b@@b@ public void start()@b@ {@b@ }@b@@b@ public void setSource(DataWrapper source)@b@ {@b@ this.source = ((DatabaseWrapper)source);@b@ }@b@@b@ public void setTarget(DataWrapper target) {@b@ this.target = ((DatabaseWrapper)target);@b@ }@b@}
package org.beangle.db.replication.impl;@b@@b@import java.util.Collection;@b@import java.util.Collections;@b@import java.util.List;@b@import org.apache.commons.lang.time.StopWatch;@b@import org.beangle.commons.collection.CollectUtils;@b@import org.beangle.db.jdbc.dialect.Dialect;@b@import org.beangle.db.jdbc.meta.Database;@b@import org.beangle.db.jdbc.meta.Sequence;@b@import org.beangle.db.replication.DataWrapper;@b@import org.beangle.db.replication.Replicator;@b@import org.beangle.db.replication.wrappers.DatabaseWrapper;@b@import org.slf4j.Logger;@b@import org.slf4j.LoggerFactory;@b@@b@public class SequenceReplicator@b@ implements Replicator@b@{@b@ private static final Logger logger = LoggerFactory.getLogger(SequenceReplicator.class);@b@ DatabaseWrapper source;@b@ DatabaseWrapper target;@b@ List<Sequence> sequences = CollectUtils.newArrayList();@b@@b@ public SequenceReplicator(DatabaseWrapper source, DatabaseWrapper target)@b@ {@b@ this.source = source;@b@ this.target = target;@b@ }@b@@b@ public void setSource(DataWrapper source) {@b@ this.source = ((DatabaseWrapper)source);@b@ }@b@@b@ public void setTarget(DataWrapper target) {@b@ this.target = ((DatabaseWrapper)target);@b@ }@b@@b@ public void reset()@b@ {@b@ }@b@@b@ private boolean reCreate(Sequence sequence) {@b@ if (this.target.drop(sequence)) {@b@ if (this.target.create(sequence)) {@b@ logger.info("Recreate sequence {}", sequence.getName());@b@ return true;@b@ }@b@ logger.error("Recreate sequence {} failure.", sequence.getName());@b@ }@b@@b@ return false;@b@ }@b@@b@ public void start() {@b@ Dialect targetDialect = this.target.getDatabase().getDialect();@b@ if (null == targetDialect.getSequenceGrammar()) {@b@ logger.info("Target database {} dosen't support sequence,replication ommited.", targetDialect.getClass().getSimpleName());@b@@b@ return;@b@ }@b@ Collections.sort(this.sequences);@b@ StopWatch watch = new StopWatch();@b@ watch.start();@b@ logger.info("Start sequence replication...");@b@ for (Sequence sequence : this.sequences)@b@ reCreate(sequence);@b@@b@ logger.info("End sequence replication,using {}", Long.valueOf(watch.getTime()));@b@ }@b@@b@ public void addAll(Collection<Sequence> newSequences) {@b@ this.sequences.addAll(newSequences);@b@ }@b@}
3.ReplicatorBuilder构成类及相关
package org.beangle.db.replication;@b@@b@import java.util.Collection;@b@import java.util.List;@b@import java.util.Map;@b@import java.util.Set;@b@import javax.sql.DataSource;@b@import org.apache.commons.lang.Validate;@b@import org.beangle.commons.collection.CollectUtils;@b@import org.beangle.db.jdbc.dialect.Dialect;@b@import org.beangle.db.jdbc.dialect.Dialects;@b@import org.beangle.db.jdbc.meta.Constraint;@b@import org.beangle.db.jdbc.meta.Database;@b@import org.beangle.db.jdbc.meta.Sequence;@b@import org.beangle.db.jdbc.meta.Table;@b@import org.beangle.db.replication.impl.CompositeReplicator;@b@import org.beangle.db.replication.impl.ConstraintReplicator;@b@import org.beangle.db.replication.impl.DataReplicator;@b@import org.beangle.db.replication.impl.DefaultTableFilter;@b@import org.beangle.db.replication.impl.SequenceReplicator;@b@import org.beangle.db.replication.wrappers.DatabaseWrapper;@b@@b@public final class ReplicatorBuilder@b@{@b@ DatabaseSource source;@b@ DatabaseTarget target;@b@@b@ public DatabaseSource source(String dialectName, DataSource dataSource)@b@ {@b@ this.source = new DatabaseSource(dataSource, Dialects.getDialect(dialectName));@b@ return this.source;@b@ }@b@@b@ public DatabaseTarget target(String dialectName, DataSource dataSource) {@b@ this.target = new DatabaseTarget(this, dataSource, Dialects.getDialect(dialectName));@b@ return this.target;@b@ }@b@@b@ public Replicator build() {@b@ DatabaseWrapper sourceWrapper = this.source.buildWrapper();@b@ DatabaseWrapper targetWrapper = this.target.buildWrapper();@b@@b@ DataReplicator dataReplicator = new DataReplicator(sourceWrapper, targetWrapper);@b@ dataReplicator.addAll(this.source.filterTables());@b@@b@ ConstraintReplicator contraintRelicator = new ConstraintReplicator(sourceWrapper, targetWrapper);@b@ contraintRelicator.addAll(this.source.filterConstraints());@b@@b@ SequenceReplicator sequenceReplicator = new SequenceReplicator(sourceWrapper, targetWrapper);@b@ sequenceReplicator.addAll(this.source.filterSequences());@b@@b@ return new CompositeReplicator(new Replicator[] { dataReplicator, contraintRelicator, sequenceReplicator }); }@b@@b@ public static final class DatabaseSource { DataSource dataSource;@b@ Dialect dialect;@b@ String schema;@b@ String catelog;@b@ List<Table> tables;@b@ String[] includes;@b@ String[] excludes;@b@ boolean toLowercase = false;@b@ DatabaseWrapper wrapper = null;@b@ Collection<String> tablenames = null;@b@@b@ public DatabaseSource(DataSource dataSource, Dialect dialect) { this.dataSource = dataSource;@b@ this.dialect = dialect;@b@ }@b@@b@ public DatabaseSource schema(String schema) {@b@ this.schema = schema;@b@ return this;@b@ }@b@@b@ public DatabaseSource catelog(String catelog) {@b@ this.catelog = catelog;@b@ return this;@b@ }@b@@b@ public DatabaseSource lowercase() {@b@ this.toLowercase = true;@b@ return this;@b@ }@b@@b@ protected DatabaseWrapper buildWrapper() {@b@ if (null == this.schema) this.schema = this.dialect.defaultSchema();@b@ this.wrapper = new DatabaseWrapper(this.dataSource, this.dialect, this.catelog, this.schema);@b@ return this.wrapper;@b@ }@b@@b@ private Collection<String> filter(Set<String> finalTables) {@b@ String[] arr$;@b@ int len$;@b@ int i$;@b@ DefaultTableFilter filter = new DefaultTableFilter();@b@ if (null != this.includes) {@b@ arr$ = this.includes; len$ = arr$.length; for (i$ = 0; i$ < len$; ++i$) { String include = arr$[i$];@b@ filter.addInclude(include); }@b@ }@b@ if (null != this.excludes) {@b@ arr$ = this.excludes; len$ = arr$.length; for (i$ = 0; i$ < len$; ++i$) { String exclude = arr$[i$];@b@ filter.addExclude(exclude); }@b@ }@b@ return filter.filter(finalTables);@b@ }@b@@b@ public DatabaseSource tables(String[] includes) {@b@ this.includes = includes;@b@ return this;@b@ }@b@@b@ public DatabaseSource exclude(String[] excludes) {@b@ this.excludes = excludes;@b@ return this;@b@ }@b@@b@ public DatabaseSource indexes(String[] indexes) {@b@ return this;@b@ }@b@@b@ public DatabaseSource contraints(String[] string) {@b@ return this;@b@ }@b@@b@ public DatabaseSource sequences(String[] string) {@b@ return this;@b@ }@b@@b@ protected List<Table> filterTables() {@b@ Collection tablenames = filter(this.wrapper.getDatabase().getTables().keySet());@b@ this.tables = CollectUtils.newArrayList();@b@ for (String name : tablenames) {@b@ Table tb = this.wrapper.getDatabase().getTable(name);@b@ tb = tb.clone();@b@ if (this.toLowercase)@b@ tb.lowerCase();@b@@b@ this.tables.add(tb);@b@ }@b@ return this.tables;@b@ }@b@@b@ protected List<Constraint> filterConstraints() {@b@ Validate.notNull(this.tables, "Call filterTables first");@b@ List contraints = CollectUtils.newArrayList();@b@ for (Table table : this.tables)@b@ contraints.addAll(table.getForeignKeys().values());@b@@b@ return contraints;@b@ }@b@@b@ protected Collection<Sequence> filterSequences() {@b@ return this.wrapper.getDatabase().getSequences();@b@ }@b@ }@b@@b@ public final class DatabaseTarget@b@ {@b@ DataSource dataSource;@b@ Dialect dialect;@b@ String schema;@b@ String catelog;@b@@b@ public DatabaseTarget(, DataSource paramDataSource, Dialect paramDialect)@b@ {@b@ this.dataSource = paramDataSource;@b@ this.dialect = paramDialect;@b@ }@b@@b@ public DatabaseTarget schema() {@b@ this.schema = schema;@b@ return this;@b@ }@b@@b@ public DatabaseTarget catelog() {@b@ this.catelog = catelog;@b@ return this;@b@ }@b@@b@ protected DatabaseWrapper buildWrapper() {@b@ if (null == this.schema) this.schema = this.dialect.defaultSchema();@b@ return new DatabaseWrapper(this.dataSource, this.dialect, this.catelog, this.schema);@b@ }@b@ }@b@}
4.ReplicatorMain主入口调用
package org.beangle.db.replication;@b@@b@import java.io.PrintStream;@b@import org.apache.commons.lang.StringUtils;@b@import org.beangle.db.jdbc.util.DataSourceUtil;@b@@b@public class ReplicatorMain@b@{@b@ public static void main(String[] args)@b@ throws Exception@b@ {@b@ ReplicatorBuilder builder = new ReplicatorBuilder();@b@ if (args.length < 2) {@b@ System.out.println("Usage:ReplicatorMain datasource:dialect:schema targetsource:dialect:schema");@b@ System.exit(0);@b@ }@b@ String src = args[1];@b@ String tar = args[2];@b@ String[] source = StringUtils.split(src, ':');@b@ String[] target = StringUtils.split(tar, ':');@b@ builder.source(source[1], DataSourceUtil.getDataSource(source[0])).schema(source[2]).tables(new String[] { "*" }).contraints(new String[] { "*" }).sequences(new String[] { "*" });@b@@b@ builder.target(target[1], DataSourceUtil.getDataSource(target[0])).schema(target[2]);@b@ Replicator replicator = builder.build();@b@ replicator.start();@b@ }@b@}