Included Modules

EventMachine::Protocols::Postgres3

PROVISIONAL IMPLEMENTATION of an evented Postgres client. This implements version 3 of the Postgres wire protocol, which will work with any Postgres version from roughly 7.4 onward.

Objective: we want to access Postgres databases without requiring threads. Until now this has been a problem because the Postgres client implementations have all made use of blocking I/O calls, which is incompatible with a thread-free evented model.

But rather than re-implement the Postgres Wire3 protocol, we’re taking advantage of the existing postgres-pr library, which was originally written by Michael Neumann but (at this writing) appears to be no longer maintained. Still, it’s in basically a production-ready state, and the wire protocol isn’t that complicated anyway.

We need to monkeypatch StringIO because it lacks the # method needed by postgres-pr.

We’re tucking in a bunch of require statements that may not be present in garden-variety EM installations. Until we find a good way to only require these if a program requires postgres, this file will need to be required explicitly.

The StringIO monkeypatch is lifted verbatim from the standard library readbytes.rb, which adds method # directly to class IO. But StringIO is not a subclass of IO.

We cloned the handling of postgres messages from lib/postgres-pr/connection.rb in the postgres-pr library, and modified it for event-handling.

TODO: The password handling in dispatch_conn_message is totally incomplete.

We return Deferrables from the user-level operations surfaced by this interface. Experimentally, we’re using the pattern of always returning a boolean value as the first argument of a deferrable callback to indicate success or failure. This is instead of the traditional pattern of calling Deferrable#succeed or #, and requiring the user to define both a callback and an errback function.

Usage

 EM.run {
   db = EM.connect_unix_domain( "/tmp/.s.PGSQL.5432", EM::P::Postgres3 )
   db.connect( dbname, username, psw ).callback do |status|
     if status
       db.query( "select * from some_table" ).callback do |status, result, errors|
         if status
           result.rows.each do |row|
             p row
           end
         end
       end
     end
   end
 }

Public Class Methods

new() click to toggle source
     # File lib/em/protocols/postgres3.rb, line 110
110:       def initialize
111:         @data = ""
112:         @params = {}
113:       end

Public Instance Methods

connect(db, user, psw=nil) click to toggle source
     # File lib/em/protocols/postgres3.rb, line 115
115:       def connect db, user, psw=nil
116:         d = EM::DefaultDeferrable.new
117:         d.timeout 15
118: 
119:         if @pending_query || @pending_conn
120:           d.succeed false, "Operation already in progress"
121:         else
122:           @pending_conn = d
123:           prms = {"user"=>user, "database"=>db}
124:           @user = user
125:           if psw
126:             @password = psw
127:             #prms["password"] = psw
128:           end
129:           send_data PostgresPR::StartupMessage.new( 3 << 16, prms ).dump
130:         end
131: 
132:         d
133:       end
dispatch_conn_message(msg) click to toggle source

Cloned and modified from the postgres-pr.

     # File lib/em/protocols/postgres3.rb, line 180
180:       def dispatch_conn_message msg
181:         case msg
182:         when AuthentificationClearTextPassword
183:           raise ArgumentError, "no password specified" if @password.nil?
184:           send_data PasswordMessage.new(@password).dump
185: 
186:         when AuthentificationCryptPassword
187:           raise ArgumentError, "no password specified" if @password.nil?
188:           send_data PasswordMessage.new(@password.crypt(msg.salt)).dump
189: 
190:         when AuthentificationMD5Password
191:           raise ArgumentError, "no password specified" if @password.nil?
192:           require 'digest/md5'
193: 
194:           m = Digest::MD5.hexdigest(@password + @user)
195:           m = Digest::MD5.hexdigest(m + msg.salt)
196:           m = 'md5' + m
197:           send_data PasswordMessage.new(m).dump
198: 
199:         when AuthentificationKerberosV4, AuthentificationKerberosV5, AuthentificationSCMCredential
200:           raise "unsupported authentification"
201: 
202:         when AuthentificationOk
203:         when ErrorResponse
204:           raise msg.field_values.join("\t")
205:         when NoticeResponse
206:           @notice_processor.call(msg) if @notice_processor
207:         when ParameterStatus
208:           @params[msg.key] = msg.value
209:         when BackendKeyData
210:           # TODO
211:           #p msg
212:         when ReadyForQuery
213:           # TODO: use transaction status
214:           pc,@pending_conn = @pending_conn,nil
215:           pc.succeed true
216:         else
217:           raise "unhandled message type"
218:         end
219:       end
dispatch_query_message(msg) click to toggle source

Cloned and modified from the postgres-pr.

     # File lib/em/protocols/postgres3.rb, line 222
222:       def dispatch_query_message msg
223:         case msg
224:         when DataRow
225:           @r.rows << msg.columns
226:         when CommandComplete
227:           @r.cmd_tag = msg.cmd_tag
228:         when ReadyForQuery
229:           pq,@pending_query = @pending_query,nil
230:           pq.succeed true, @r, @e
231:         when RowDescription
232:           @r.fields = msg.fields
233:         when CopyInResponse
234:         when CopyOutResponse
235:         when EmptyQueryResponse
236:         when ErrorResponse
237:           # TODO
238:           @e << msg
239:         when NoticeResponse
240:           @notice_processor.call(msg) if @notice_processor
241:         else
242:           # TODO
243:         end
244:       end
query(sql) click to toggle source
     # File lib/em/protocols/postgres3.rb, line 135
135:       def query sql
136:         d = EM::DefaultDeferrable.new
137:         d.timeout 15
138: 
139:         if @pending_query || @pending_conn
140:           d.succeed false, "Operation already in progress"
141:         else
142:           @r = PostgresPR::Connection::Result.new
143:           @e = []
144:           @pending_query = d
145:           send_data PostgresPR::Query.dump(sql)
146:         end
147: 
148:         d
149:       end
receive_data(data) click to toggle source
     # File lib/em/protocols/postgres3.rb, line 152
152:       def receive_data data
153:         @data << data
154:         while @data.length >= 5
155:           pktlen = @data[1...5].unpack("N").first
156:           if @data.length >= (1 + pktlen)
157:             pkt = @data.slice!(0...(1+pktlen))
158:             m = StringIO.open( pkt, "r" ) {|io| PostgresPR::Message.read( io ) }
159:             if @pending_conn
160:               dispatch_conn_message m
161:             elsif @pending_query
162:               dispatch_query_message m
163:             else
164:               raise "Unexpected message from database"
165:             end
166:           else
167:             break # very important, break out of the while
168:           end
169:         end
170:       end
unbind() click to toggle source
     # File lib/em/protocols/postgres3.rb, line 173
173:       def unbind
174:         if o = (@pending_query || @pending_conn)
175:           o.succeed false, "lost connection"
176:         end
177:       end

Disabled; run with --debug to generate this.

[Validate]

Generated with the Darkfish Rdoc Generator 1.1.6.