代码拉取完成,页面将自动刷新
同步操作将从 魏大伟/redis-replicator 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
Redis Replicator是一款rdb解析以及命令解析的工具. 此工具完整实现了redis replication协议.
支持sync,psync,psync2等三种同步命令. 还支持远程rdb文件备份以及数据同步等功能.
此文中提到的 命令
特指redis中的写命令,不包括读命令(比如 get
,hmget
)
479688557
jdk 1.7+
maven-3.2.3+
redis 2.4 - 4.0-rc2
<dependency>
<groupId>com.moilioncircle</groupId>
<artifactId>redis-replicator</artifactId>
<version>2.0.0-rc3</version>
</dependency>
$mvn clean install package -Dmaven.test.skip=true
Replicator replicator = new RedisReplicator("127.0.0.1", 6379, Configuration.defaultSetting());
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
System.out.println(kv);
}
});
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
System.out.println(command);
}
});
replicator.open();
Replicator replicator = new RedisReplicator(new File("dump.rdb"), FileType.RDB, Configuration.defaultSetting());
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
System.out.println(kv);
}
});
replicator.open();
Replicator replicator = new RedisReplicator(new File("appendonly.aof"), FileType.AOF, Configuration.defaultSetting());
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
System.out.println(command);
}
});
replicator.open();
[RDB file][AOF tail]
aof-use-rdb-preamble yes
final Replicator replicator = new RedisReplicator(new File("appendonly.aof"), FileType.MIXED,
Configuration.defaultSetting());
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
System.out.println(kv);
}
});
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
System.out.println(command);
}
});
replicator.open();
final FileOutputStream out = new FileOutputStream(new File("./dump.rdb"));
final RawByteListener rawByteListener = new RawByteListener() {
@Override
public void handle(byte... rawBytes) {
try {
out.write(rawBytes);
} catch (IOException ignore) {
}
}
};
//保存server端传来的rdb文件
Replicator replicator = new RedisReplicator("127.0.0.1", 6379, Configuration.defaultSetting());
replicator.addRdbListener(new RdbListener() {
@Override
public void preFullSync(Replicator replicator) {
replicator.addRawByteListener(rawByteListener);
}
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
}
@Override
public void postFullSync(Replicator replicator, long checksum) {
replicator.removeRawByteListener(rawByteListener);
try {
out.close();
replicator.close();
} catch (IOException ignore) {
}
}
});
replicator.open();
//检查写入的rdb文件
replicator = new RedisReplicator(new File("./dump.rdb"), FileType.RDB, Configuration.defaultSetting());
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
System.out.println(kv);
}
});
replicator.open();
final FileOutputStream out = new FileOutputStream(new File("./appendonly.aof"));
final RawByteListener rawByteListener = new RawByteListener() {
@Override
public void handle(byte... rawBytes) {
try {
out.write(rawBytes);
} catch (IOException ignore) {
}
}
};
//保存1000条命令
Replicator replicator = new RedisReplicator("127.0.0.1", 6379, Configuration.defaultSetting());
replicator.addRdbListener(new RdbListener() {
@Override
public void preFullSync(Replicator replicator) {
}
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
}
@Override
public void postFullSync(Replicator replicator, long checksum) {
replicator.addRawByteListener(rawByteListener);
}
});
final AtomicInteger acc = new AtomicInteger(0);
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
if (acc.incrementAndGet() == 1000) {
try {
out.close();
replicator.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
replicator.open();
//检查写入的aof文件
replicator = new RedisReplicator(new File("./appendonly.aof"), FileType.AOF, Configuration.defaultSetting());
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
System.out.println(command);
}
});
replicator.open();
public static class YourAppendCommand implements Command {
private final String key;
private final String value;
public YourAppendCommand(String key, String value) {
this.key = key;
this.value = value;
}
public String getKey() {
return key;
}
public String getValue() {
return value;
}
@Override
public String toString() {
return "YourAppendCommand{" +
"key='" + key + '\'' +
", value='" + value + '\'' +
'}';
}
}
}
public class YourAppendParser implements CommandParser<YourAppendCommand> {
@Override
public YourAppendCommand parse(Object[] command) {
return new YourAppendCommand((String) command[1], (String) command[2]);
}
}
Replicator replicator = new RedisReplicator("127.0.0.1",6379,Configuration.defaultSetting());
replicator.addCommandParser(CommandName.name("APPEND"),new YourAppendParser());
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
if(command instanceof YourAppendCommand){
YourAppendCommand appendCommand = (YourAppendCommand)command;
//你的业务代码写在这
}
}
});
$cd /path/to/redis-4.0-rc2/src/modules
$make
loadmodule /path/to/redis-4.0-rc2/src/modules/hellotype.so
public class HelloTypeModuleParser implements ModuleParser<HelloTypeModule> {
@Override
public HelloTypeModule parse(RedisInputStream in) throws IOException {
DefaultRdbModuleParser parser = new DefaultRdbModuleParser(in);
int elements = (int) parser.loadUnSigned();
long[] ary = new long[elements];
int i = 0;
while (elements-- > 0) {
ary[i++] = parser.loadSigned();
}
return new HelloTypeModule(ary);
}
}
public class HelloTypeModule implements Module {
private final long[] value;
public HelloTypeModule(long[] value) {
this.value = value;
}
public long[] getValue() {
return value;
}
@Override
public String toString() {
return "HelloTypeModule{" +
"value=" + Arrays.toString(value) +
'}';
}
}
public class HelloTypeParser implements CommandParser<HelloTypeCommand> {
@Override
public HelloTypeCommand parse(Object[] command) {
String key = (String) command[1];
long value = Long.parseLong((String) command[2]);
return new HelloTypeCommand(key, value);
}
}
public class HelloTypeCommand implements Command {
private final String key;
private final long value;
public long getValue() {
return value;
}
public String getKey() {
return key;
}
public HelloTypeCommand(String key, long value) {
this.key = key;
this.value = value;
}
@Override
public String toString() {
return "HelloTypeCommand{" +
"key='" + key + '\'' +
", value=" + value +
'}';
}
}
public static void main(String[] args) throws IOException {
RedisReplicator replicator = new RedisReplicator("127.0.0.1", 6379, Configuration.defaultSetting());
replicator.addCommandParser(CommandName.name("hellotype.insert"), new HelloTypeParser());
replicator.addModuleParser("hellotype", 0, new HelloTypeModuleParser());
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
if (kv instanceof KeyStringValueModule) {
System.out.println(kv);
}
}
});
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
if (command instanceof HelloTypeCommand) {
System.out.println(command);
}
}
});
replicator.open();
}
RdbVisitor
抽象类Replicator
的setRdbVisitor
方法注册你自己的 RdbVisitor
.命令 | 命令 | 命令 | 命令 | 命令 | 命令 |
---|---|---|---|---|---|
PING | APPEND | SET | SETEX | MSET | DEL |
SADD | HMSET | HSET | LSET | EXPIRE | EXPIREAT |
GETSET | HSETNX | MSETNX | PSETEX | SETNX | SETRANGE |
HDEL | UNLINK | SREM | LPOP | LPUSH | LPUSHX |
LRem | RPOP | RPUSH | RPUSHX | ZREM | RENAME |
INCR | DECR | INCRBY | PERSIST | SELECT | FLUSHALL |
FLUSHDB | HINCRBY | ZINCRBY | MOVE | SMOVE | PFADD |
PFCOUNT | PFMERGE | SDIFFSTORE | RENAMENX | PEXPIREAT | SINTERSTORE |
ZADD | BITFIELD | SUNIONSTORE | RESTORE | LINSERT | ZINTERSTORE |
GEOADD | PEXPIRE | ZUNIONSTORE | EVAL | SCRIPT | BRPOPLPUSH |
PUBLISH | BITOP | SETBIT |
client-output-buffer-limit slave 0 0 0
警告: 这个配置可能会使redis-server中的内存溢出
<Logger name="com.moilioncircle" level="debug">
<AppenderRef ref="YourAppender"/>
</Logger>
Configuration.defaultSetting().setVerbose(true);
System.setProperty("javax.net.ssl.trustStore", "/path/to/truststore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "your_type");
Configuration.defaultSetting().setSsl(true);
//可选设置
Configuration.defaultSetting().setSslSocketFactory(sslSocketFactory);
Configuration.defaultSetting().setSslParameters(sslParameters);
Configuration.defaultSetting().setHostnameVerifier(hostnameVerifier);
Configuration.defaultSetting().setAuthPassword("foobared");
repl-backlog-size
repl-backlog-ttl
repl-ping-slave-period
repl-ping-slave-period
必须 小于 Configuration.getReadTimeout()
默认的 Configuration.getReadTimeout()
是30秒.
Replicator replicator = new RedisReplicator("127.0.0.1", 6379, Configuration.defaultSetting());
final long start = System.currentTimeMillis();
final AtomicInteger acc = new AtomicInteger(0);
replicator.addRdbListener(new RdbListener() {
@Override
public void preFullSync(Replicator replicator) {
System.out.println("pre full sync");
}
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
acc.incrementAndGet();
}
@Override
public void postFullSync(Replicator replicator, long checksum) {
long end = System.currentTimeMillis();
System.out.println("time elapsed:" + (end - start));
System.out.println("rdb event count:" + acc.get());
}
});
replicator.open();
Replicator replicator = new RedisReplicator("127.0.0.1", 6379, Configuration.defaultSetting());
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
if (kv instanceof KeyStringValueString) {
KeyStringValueString ksvs = (KeyStringValueString) kv;
System.out.println(Arrays.toString(ksvs.getRawBytes()));
}
}
});
replicator.open();
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。