pipline sync

之前因为项目需要先从redis中取出key的列表,再循环这些列表操作后写入到redis中,因为使用pipline,中间出现了一些问题记录学习下。
最初の源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class JedisTest extends Sync {
protected static final Logger log = LoggerFactory.getLogger(JedisTest.class);
private static final String _SET_KEY_1 = "test1";
private static final String _SET_KEY_2 = "test2";

public void process() throws SQLException {
Set<String> appSet = getAllUserableAppkey();
ShardedJedis jedis = RedisHelper.getJedis();
ShardedJedisPipeline pipeline = jedis.pipelined();

for (String key : appSet) {
Set<String> result = jedis.smembers(_SET_KEY_1);
Set<String> result2 = jedis.smembers(_SET_KEY_2);
log.warn("result1 :{},result2:{}",result,result2);
String rangName = String.format("%s::%s","test",key);
for (int i = 0; i < 10; i++) {
pipeline.sadd(rangName, String.valueOf(i));
}

}
pipeline.sync();
try {
} finally {
if (jedis != null) {
RedisHelper.getPool().returnResource(jedis);
}
}
}

public Set<String> getAllUserableAppkey() {
Set<String> result = new HashSet<>();
result.add("test1");
result.add("test2");
return result;
}

public static void main(String[] args) throws Exception {
DbHelper.init();
RedisHelper.init();
JedisTest jedisTest = new JedisTest();
try {
jedisTest.process();
} catch (SQLException e) {
e.printStackTrace();
}
}
}

报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
17:37:23,721  WARN JedisTest:31 - result1 :[],result2:[]
Exception in thread "main" java.lang.ClassCastException: java.lang.Long cannot be cast to java.util.List
at redis.clients.jedis.Connection.getBinaryMultiBulkReply(Connection.java:221)
at redis.clients.jedis.Connection.getMultiBulkReply(Connection.java:214)
at redis.clients.jedis.Jedis.smembers(Jedis.java:1191)
at redis.clients.jedis.ShardedJedis.smembers(ShardedJedis.java:321)
at com.snda.sync.impl.test.JedisTest.process(JedisTest.java:29)
at com.snda.sync.impl.test.JedisTest.main(JedisTest.java:59)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

Connection.java:221

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@SuppressWarnings("unchecked")
public List<byte[]> getBinaryMultiBulkReply() {
flush();
pipelinedCommands--;
return (List<byte[]>) readProtocolWithCheckingBroken();
}
public List<String> getMultiBulkReply() {
return (List)BuilderFactory.STRING_LIST.build(this.getBinaryMultiBulkReply());
}

/**
* Return all the members (elements) of the set value stored at key. This is just syntax glue for
* {@link #sinter(String...) SINTER}.
* <p>
* Time complexity O(N)
* @param key
* @return Multi bulk reply
*/

public Set<String> smembers(final String key) {
checkIsInMulti();
client.smembers(key);
final List<String> members = client.getMultiBulkReply();
if (members == null) {
return null;
}
return new HashSet<String>(members);
}

public Set<String> smembers(String key) {
Jedis j = (Jedis)this.getShard(key);
return j.smembers(key);
}

怀疑是不是pipeline.addjedis.smembers起了冲突,于是把pipeline.sync放到循环里面做,发现有报异常:

1
Exception in thread "main" redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out

这里报异常是因为pipeline.sync是同步并关闭通道的命令,而此时申请通道是在循环外面的,相当于只申请了一次,之后再使用就发现已经关闭了,所以放在循环里面这段代码就可跑通。代码如下:

1
2
3
4
5
6
7
8
9
10
11
for (String key : appSet) {
final ShardedJedisPipeline pipeline = jedis.pipelined();
Set<String> result = jedis.smembers(_SET_KEY_1);
Set<String> result2 = jedis.smembers(_SET_KEY_2);
//log.warn("result1 :{},result2:{}",result,result2);
String rangName = String.format("%s::%s", "test", key);
for (int i = 0; i < 10; i++) {
pipeline.sadd(rangName, String.valueOf(i));
}
pipeline.sync();
}

实际上,jedis与其pipline是不能同时使用的,其中jedis立即得到相应,而pipeline稍后得到相应,这容易混淆得到的response
只需要为jedis操作和pipeline操作分别获得一个jedis实例就可以解决这个问题,实例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
final ShardedJedis jedis = RedisHelper.getJedis();
final ShardedJedis jedis1 = RedisHelper.getJedis();
final ShardedJedisPipeline pipeline = jedis.pipelined();
for (String key : appSet) {
Set<String> result = jedis1.smembers(_SET_KEY_1);
Set<String> result2 = jedis1.smembers(_SET_KEY_2);
//log.warn("result1 :{},result2:{}",result,result2);
String rangName = String.format("%s::%s", "test", key);
for (int i = 0; i < 10; i++) {
pipeline.sadd(rangName, String.valueOf(i));
}

}
pipeline.sync();