PROVISIONAL IMPLEMENTATION of an evented Postgres client. This implements version 3 of the Postgres wire protocol, which will work with any Postgres version from roughly 7.4 onward.
Objective: we want to access Postgres databases without requiring threads. Until now this has been a problem because the Postgres client implementations have all made use of blocking I/O calls, which is incompatible with a thread-free evented model.
But rather than re-implement the Postgres Wire3 protocol, we’re taking advantage of the existing postgres-pr library, which was originally written by Michael Neumann but (at this writing) appears to be no longer maintained. Still, it’s in basically a production-ready state, and the wire protocol isn’t that complicated anyway.
We need to monkeypatch StringIO because it lacks the # method needed by postgres-pr.
We’re tucking in a bunch of require statements that may not be present in garden-variety EM installations. Until we find a good way to only require these if a program requires postgres, this file will need to be required explicitly.
The StringIO monkeypatch is lifted verbatim from the standard library readbytes.rb, which adds method # directly to class IO. But StringIO is not a subclass of IO.
We cloned the handling of postgres messages from lib/postgres-pr/connection.rb in the postgres-pr library, and modified it for event-handling.
TODO: The password handling in dispatch_conn_message is totally incomplete.
We return Deferrables from the user-level operations surfaced by this interface. Experimentally, we’re using the pattern of always returning a boolean value as the first argument of a deferrable callback to indicate success or failure. This is instead of the traditional pattern of calling Deferrable#succeed or #, and requiring the user to define both a callback and an errback function.
EM.run { db = EM.connect_unix_domain( "/tmp/.s.PGSQL.5432", EM::P::Postgres3 ) db.connect( dbname, username, psw ).callback do |status| if status db.query( "select * from some_table" ).callback do |status, result, errors| if status result.rows.each do |row| p row end end end end end }
# File lib/em/protocols/postgres3.rb, line 115 115: def connect db, user, psw=nil 116: d = EM::DefaultDeferrable.new 117: d.timeout 15 118: 119: if @pending_query || @pending_conn 120: d.succeed false, "Operation already in progress" 121: else 122: @pending_conn = d 123: prms = {"user"=>user, "database"=>db} 124: @user = user 125: if psw 126: @password = psw 127: #prms["password"] = psw 128: end 129: send_data PostgresPR::StartupMessage.new( 3 << 16, prms ).dump 130: end 131: 132: d 133: end
Cloned and modified from the postgres-pr.
# File lib/em/protocols/postgres3.rb, line 180 180: def dispatch_conn_message msg 181: case msg 182: when AuthentificationClearTextPassword 183: raise ArgumentError, "no password specified" if @password.nil? 184: send_data PasswordMessage.new(@password).dump 185: 186: when AuthentificationCryptPassword 187: raise ArgumentError, "no password specified" if @password.nil? 188: send_data PasswordMessage.new(@password.crypt(msg.salt)).dump 189: 190: when AuthentificationMD5Password 191: raise ArgumentError, "no password specified" if @password.nil? 192: require 'digest/md5' 193: 194: m = Digest::MD5.hexdigest(@password + @user) 195: m = Digest::MD5.hexdigest(m + msg.salt) 196: m = 'md5' + m 197: send_data PasswordMessage.new(m).dump 198: 199: when AuthentificationKerberosV4, AuthentificationKerberosV5, AuthentificationSCMCredential 200: raise "unsupported authentification" 201: 202: when AuthentificationOk 203: when ErrorResponse 204: raise msg.field_values.join("\t") 205: when NoticeResponse 206: @notice_processor.call(msg) if @notice_processor 207: when ParameterStatus 208: @params[msg.key] = msg.value 209: when BackendKeyData 210: # TODO 211: #p msg 212: when ReadyForQuery 213: # TODO: use transaction status 214: pc,@pending_conn = @pending_conn,nil 215: pc.succeed true 216: else 217: raise "unhandled message type" 218: end 219: end
Cloned and modified from the postgres-pr.
# File lib/em/protocols/postgres3.rb, line 222 222: def dispatch_query_message msg 223: case msg 224: when DataRow 225: @r.rows << msg.columns 226: when CommandComplete 227: @r.cmd_tag = msg.cmd_tag 228: when ReadyForQuery 229: pq,@pending_query = @pending_query,nil 230: pq.succeed true, @r, @e 231: when RowDescription 232: @r.fields = msg.fields 233: when CopyInResponse 234: when CopyOutResponse 235: when EmptyQueryResponse 236: when ErrorResponse 237: # TODO 238: @e << msg 239: when NoticeResponse 240: @notice_processor.call(msg) if @notice_processor 241: else 242: # TODO 243: end 244: end
# File lib/em/protocols/postgres3.rb, line 135 135: def query sql 136: d = EM::DefaultDeferrable.new 137: d.timeout 15 138: 139: if @pending_query || @pending_conn 140: d.succeed false, "Operation already in progress" 141: else 142: @r = PostgresPR::Connection::Result.new 143: @e = [] 144: @pending_query = d 145: send_data PostgresPR::Query.dump(sql) 146: end 147: 148: d 149: end
# File lib/em/protocols/postgres3.rb, line 152 152: def receive_data data 153: @data << data 154: while @data.length >= 5 155: pktlen = @data[1...5].unpack("N").first 156: if @data.length >= (1 + pktlen) 157: pkt = @data.slice!(0...(1+pktlen)) 158: m = StringIO.open( pkt, "r" ) {|io| PostgresPR::Message.read( io ) } 159: if @pending_conn 160: dispatch_conn_message m 161: elsif @pending_query 162: dispatch_query_message m 163: else 164: raise "Unexpected message from database" 165: end 166: else 167: break # very important, break out of the while 168: end 169: end 170: end
Disabled; run with --debug to generate this.
Generated with the Darkfish Rdoc Generator 1.1.6.