class Redis::Client
Constants
- DEFAULTS
Attributes
command_map[R]
connection[R]
logger[RW]
Public Class Methods
new(options = {})
click to toggle source
# File lib/redis/client.rb, line 73 def initialize(options = {}) @options = _parse_options(options) @reconnect = true @logger = @options[:logger] @connection = nil @command_map = {} @pending_reads = 0 if options.include?(:sentinels) @connector = Connector::Sentinel.new(@options) else @connector = Connector.new(@options) end end
Public Instance Methods
call(command, &block)
click to toggle source
# File lib/redis/client.rb, line 111 def call(command, &block) reply = process([command]) { read } raise reply if reply.is_a?(CommandError) if block block.call(reply) else reply end end
call_loop(command) { |reply| ... }
click to toggle source
# File lib/redis/client.rb, line 122 def call_loop(command) error = nil result = without_socket_timeout do process([command]) do loop do reply = read if reply.is_a?(CommandError) error = reply break else yield reply end end end end # Raise error when previous block broke out of the loop. raise error if error # Result is set to the value that the provided block used to break. result end
call_pipeline(pipeline)
click to toggle source
# File lib/redis/client.rb, line 146 def call_pipeline(pipeline) with_reconnect pipeline.with_reconnect? do begin pipeline.finish(call_pipelined(pipeline.commands)).tap do self.db = pipeline.db if pipeline.db end rescue ConnectionError => e return nil if pipeline.shutdown? # Assume the pipeline was sent in one piece, but execution of # SHUTDOWN caused none of the replies for commands that were executed # prior to it from coming back around. raise e end end end
call_pipelined(commands)
click to toggle source
# File lib/redis/client.rb, line 162 def call_pipelined(commands) return [] if commands.empty? # The method #ensure_connected (called from #process) reconnects once on # I/O errors. To make an effort in making sure that commands are not # executed more than once, only allow reconnection before the first reply # has been read. When an error occurs after the first reply has been # read, retrying would re-execute the entire pipeline, thus re-issuing # already successfully executed commands. To circumvent this, don't retry # after the first reply has been read successfully. result = Array.new(commands.size) reconnect = @reconnect begin process(commands) do result[0] = read @reconnect = false (commands.size - 1).times do |i| result[i + 1] = read end end ensure @reconnect = reconnect end result end
call_with_timeout(command, timeout, &blk)
click to toggle source
# File lib/redis/client.rb, line 193 def call_with_timeout(command, timeout, &blk) with_socket_timeout(timeout) do call(command, &blk) end rescue ConnectionError retry end
call_without_timeout(command, &blk)
click to toggle source
# File lib/redis/client.rb, line 201 def call_without_timeout(command, &blk) call_with_timeout(command, 0, &blk) end
connect()
click to toggle source
# File lib/redis/client.rb, line 89 def connect @pid = Process.pid # Don't try to reconnect when the connection is fresh with_reconnect(false) do establish_connection call [:auth, password] if password call [:select, db] if db != 0 @connector.check(self) end self end
connected?()
click to toggle source
# File lib/redis/client.rb, line 222 def connected? !! (connection && connection.connected?) end
db()
click to toggle source
# File lib/redis/client.rb, line 53 def db @options[:db] end
db=(db)
click to toggle source
# File lib/redis/client.rb, line 57 def db=(db) @options[:db] = db.to_i end
disconnect()
click to toggle source
# File lib/redis/client.rb, line 226 def disconnect connection.disconnect if connected? end
driver()
click to toggle source
# File lib/redis/client.rb, line 61 def driver @options[:driver] end
host()
click to toggle source
# File lib/redis/client.rb, line 33 def host @options[:host] end
id()
click to toggle source
# File lib/redis/client.rb, line 103 def id @options[:id] || "redis://#{location}/#{db}" end
inherit_socket?()
click to toggle source
# File lib/redis/client.rb, line 65 def inherit_socket? @options[:inherit_socket] end
io() { || ... }
click to toggle source
# File lib/redis/client.rb, line 235 def io yield rescue TimeoutError => e1 # Add a message to the exception without destroying the original stack e2 = TimeoutError.new("Connection timed out") e2.set_backtrace(e1.backtrace) raise e2 rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED, Errno::EBADF, Errno::EINVAL => e raise ConnectionError, "Connection lost (%s)" % [e.class.name.split("::").last] end
location()
click to toggle source
# File lib/redis/client.rb, line 107 def location path || "#{host}:#{port}" end
options()
click to toggle source
# File lib/redis/client.rb, line 25 def options Marshal.load(Marshal.dump(@options)) end
password()
click to toggle source
# File lib/redis/client.rb, line 49 def password @options[:password] end
path()
click to toggle source
# File lib/redis/client.rb, line 41 def path @options[:path] end
port()
click to toggle source
# File lib/redis/client.rb, line 37 def port @options[:port] end
process(commands) { || ... }
click to toggle source
# File lib/redis/client.rb, line 205 def process(commands) logging(commands) do ensure_connected do commands.each do |command| if command_map[command.first] command = command.dup command[0] = command_map[command.first] end write(command) end yield if block_given? end end end
read()
click to toggle source
# File lib/redis/client.rb, line 246 def read io do value = connection.read @pending_reads -= 1 value end end
reconnect()
click to toggle source
# File lib/redis/client.rb, line 230 def reconnect disconnect connect end
scheme()
click to toggle source
# File lib/redis/client.rb, line 29 def scheme @options[:scheme] end
timeout()
click to toggle source
# File lib/redis/client.rb, line 45 def timeout @options[:timeout] end
with_reconnect(val=true) { || ... }
click to toggle source
# File lib/redis/client.rb, line 276 def with_reconnect(val=true) begin original, @reconnect = @reconnect, val yield ensure @reconnect = original end end
with_socket_timeout(timeout) { || ... }
click to toggle source
# File lib/redis/client.rb, line 261 def with_socket_timeout(timeout) connect unless connected? begin connection.timeout = timeout yield ensure connection.timeout = self.timeout if connected? end end
without_reconnect(&blk)
click to toggle source
# File lib/redis/client.rb, line 285 def without_reconnect(&blk) with_reconnect(false, &blk) end
without_socket_timeout(&blk)
click to toggle source
# File lib/redis/client.rb, line 272 def without_socket_timeout(&blk) with_socket_timeout(0, &blk) end
write(command)
click to toggle source
# File lib/redis/client.rb, line 254 def write(command) io do @pending_reads += 1 connection.write(command) end end
Protected Instance Methods
_parse_driver(driver)
click to toggle source
# File lib/redis/client.rb, line 460 def _parse_driver(driver) driver = driver.to_s if driver.is_a?(Symbol) if driver.kind_of?(String) begin require "redis/connection/#{driver}" driver = Connection.const_get(driver.capitalize) rescue LoadError, NameError raise RuntimeError, "Cannot load driver #{driver.inspect}" end end driver end
_parse_options(options)
click to toggle source
# File lib/redis/client.rb, line 368 def _parse_options(options) return options if options[:_parsed] defaults = DEFAULTS.dup options = options.dup defaults.keys.each do |key| # Fill in defaults if needed if defaults[key].respond_to?(:call) defaults[key] = defaults[key].call end # Symbolize only keys that are needed options[key] = options[key.to_s] if options.has_key?(key.to_s) end url = options[:url] || defaults[:url] # Override defaults from URL if given if url require "uri" uri = URI(url) if uri.scheme == "unix" defaults[:path] = uri.path elsif uri.scheme == "redis" # Require the URL to have at least a host raise ArgumentError, "invalid url: #{uri}" unless uri.host defaults[:scheme] = uri.scheme defaults[:host] = uri.host defaults[:port] = uri.port if uri.port defaults[:password] = CGI.unescape(uri.password) if uri.password defaults[:db] = uri.path[1..-1].to_i if uri.path defaults[:role] = :master else raise ArgumentError, "invalid uri scheme '#{uri.scheme}'" end end # Use default when option is not specified or nil defaults.keys.each do |key| options[key] = defaults[key] if options[key].nil? end if options[:path] # Unix socket options[:scheme] = "unix" options.delete(:host) options.delete(:port) else # TCP socket options[:host] = options[:host].to_s options[:port] = options[:port].to_i end options[:timeout] = options[:timeout].to_f options[:connect_timeout] = if options[:connect_timeout] options[:connect_timeout].to_f else options[:timeout] end options[:db] = options[:db].to_i options[:driver] = _parse_driver(options[:driver]) || Connection.drivers.last case options[:tcp_keepalive] when Hash [:time, :intvl, :probes].each do |key| unless options[:tcp_keepalive][key].is_a?(Fixnum) raise "Expected the #{key.inspect} key in :tcp_keepalive to be a Fixnum" end end when Fixnum if options[:tcp_keepalive] >= 60 options[:tcp_keepalive] = {:time => options[:tcp_keepalive] - 20, :intvl => 10, :probes => 2} elsif options[:tcp_keepalive] >= 30 options[:tcp_keepalive] = {:time => options[:tcp_keepalive] - 10, :intvl => 5, :probes => 2} elsif options[:tcp_keepalive] >= 5 options[:tcp_keepalive] = {:time => options[:tcp_keepalive] - 2, :intvl => 2, :probes => 1} end end options[:_parsed] = true options end
ensure_connected() { || ... }
click to toggle source
# File lib/redis/client.rb, line 334 def ensure_connected disconnect if @pending_reads > 0 attempts = 0 begin attempts += 1 if connected? unless inherit_socket? || Process.pid == @pid raise InheritedError, "Tried to use a connection from a child process without reconnecting. " + "You need to reconnect to Redis after forking " + "or set :inherit_socket to true." end else connect end yield rescue BaseConnectionError disconnect if attempts <= @options[:reconnect_attempts] && @reconnect retry else raise end rescue Exception disconnect raise end end
establish_connection()
click to toggle source
# File lib/redis/client.rb, line 316 def establish_connection server = @connector.resolve.dup @options[:host] = server[:host] @options[:port] = Integer(server[:port]) if server.include?(:port) @connection = @options[:driver].connect(@options) @pending_reads = 0 rescue TimeoutError, Errno::ECONNREFUSED, Errno::EHOSTDOWN, Errno::EHOSTUNREACH, Errno::ENETUNREACH, Errno::ETIMEDOUT raise CannotConnectError, "Error connecting to Redis on #{location} (#{$!.class})" end
logging(commands) { || ... }
click to toggle source
# File lib/redis/client.rb, line 291 def logging(commands) return yield unless @logger && @logger.debug? begin commands.each do |name, *args| logged_args = args.map do |a| case when a.respond_to?(:inspect) then a.inspect when a.respond_to?(:to_s) then a.to_s else # handle poorly-behaved descendants of BasicObject klass = a.instance_exec { (class << self; self end).superclass } "\#<#{klass}:#{a.__id__}>" end end @logger.debug("[Redis] command=#{name.to_s.upcase} args=#{logged_args.join(' ')}") end t1 = Time.now yield ensure @logger.debug("[Redis] call_time=%0.2f ms" % ((Time.now - t1) * 1000)) if t1 end end