@@ -19,12 +19,13 @@ package zio.redis
19
19
import zio ._
20
20
import zio .redis .ClusterExecutor ._
21
21
import zio .redis .api .Cluster .AskingCommand
22
+ import zio .redis .internal .{RedisConnection , RespCommand , RespCommandArgument , RespValue }
22
23
import zio .redis .options .Cluster ._
23
24
24
25
import java .io .IOException
25
26
26
- final case class ClusterExecutor (
27
- clusterConnectionRef : Ref .Synchronized [ClusterConnection ],
27
+ final class ClusterExecutor private (
28
+ clusterConnection : Ref .Synchronized [ClusterConnection ],
28
29
config : RedisClusterConfig ,
29
30
scope : Scope .Closeable
30
31
) extends RedisExecutor {
@@ -56,26 +57,26 @@ final case class ClusterExecutor(
56
57
}
57
58
58
59
for {
59
- keyOpt <- ZIO .succeed(command.args.collectFirst { case key : RespArgument .Key => key })
60
+ keyOpt <- ZIO .succeed(command.args.collectFirst { case key : RespCommandArgument .Key => key })
60
61
keySlot = keyOpt.fold(Slot .Default )(key => Slot ((key.asCRC16 & (SlotsAmount - 1 )).toLong))
61
62
result <- executeSafe(keySlot)
62
63
} yield result
63
64
}
64
65
65
66
private def executor (slot : Slot ): IO [RedisError .IOError , RedisExecutor ] =
66
- clusterConnectionRef .get.map(_.executor(slot)).flatMap(ZIO .fromOption(_).orElseFail(CusterKeyExecutorError ))
67
+ clusterConnection .get.map(_.executor(slot)).flatMap(ZIO .fromOption(_).orElseFail(CusterKeyExecutorError ))
67
68
68
69
// TODO introduce max connection amount
69
70
private def executor (address : RedisUri ): IO [RedisError .IOError , RedisExecutor ] =
70
- clusterConnectionRef .modifyZIO { cc =>
71
+ clusterConnection .modifyZIO { cc =>
71
72
val executorOpt = cc.executors.get(address).map(es => (es.executor, cc))
72
73
val enrichedClusterIO =
73
74
scope.extend[Any ](connectToNode(address)).map(es => (es.executor, cc.addExecutor(address, es)))
74
75
ZIO .fromOption(executorOpt).catchAll(_ => enrichedClusterIO)
75
76
}
76
77
77
78
private def refreshConnect : IO [RedisError , Unit ] =
78
- clusterConnectionRef .updateZIO { connection =>
79
+ clusterConnection .updateZIO { connection =>
79
80
val addresses = connection.partitions.flatMap(_.addresses)
80
81
for {
81
82
cluster <- scope.extend[Any ](initConnectToCluster(addresses))
@@ -110,11 +111,11 @@ object ClusterExecutor {
110
111
scope : Scope .Closeable
111
112
): ZIO [Scope , RedisError , ClusterExecutor ] =
112
113
for {
113
- clusterConnection <- initConnectToCluster(config.addresses)
114
- clusterConnectionRef <- Ref .Synchronized .make(clusterConnection )
115
- clusterExec = ClusterExecutor (clusterConnectionRef , config, scope)
116
- _ <- logScopeFinalizer(" Cluster executor is closed" )
117
- } yield clusterExec
114
+ connection <- initConnectToCluster(config.addresses)
115
+ ref <- Ref .Synchronized .make(connection )
116
+ executor = new ClusterExecutor (ref , config, scope)
117
+ _ <- logScopeFinalizer(" Cluster executor is closed" )
118
+ } yield executor
118
119
119
120
private def initConnectToCluster (addresses : Chunk [RedisUri ]): ZIO [Scope , RedisError , ClusterConnection ] =
120
121
ZIO
@@ -141,15 +142,15 @@ object ClusterExecutor {
141
142
private def connectToNode (address : RedisUri ) =
142
143
for {
143
144
closableScope <- Scope .make
144
- connection <- closableScope.extend[Any ](RedisConnectionLive .create(RedisConfig (address.host, address.port)))
145
+ connection <- closableScope.extend[Any ](RedisConnection .create(RedisConfig (address.host, address.port)))
145
146
executor <- closableScope.extend[Any ](SingleNodeExecutor .create(connection))
146
147
layerScope <- ZIO .scope
147
148
_ <- layerScope.addFinalizerExit(closableScope.close(_))
148
149
} yield ExecutorScope (executor, closableScope)
149
150
150
151
private def redis (address : RedisUri ) = {
151
- val executorLayer = ZLayer .succeed(RedisConfig (address.host, address.port)) >>> RedisExecutor .layer
152
- val codecLayer = ZLayer .succeed[CodecSupplier ](CodecSupplier .utf8string )
152
+ val executorLayer = ZLayer .succeed(RedisConfig (address.host, address.port)) >>> SingleNodeExecutor .layer
153
+ val codecLayer = ZLayer .succeed[CodecSupplier ](CodecSupplier .utf8 )
153
154
val redisLayer = executorLayer ++ codecLayer >>> Redis .layer
154
155
for {
155
156
closableScope <- Scope .make
0 commit comments