class GRPC::RpcDesc
RpcDesc
is a Descriptor of an RPC method.
Public Instance Methods
arity_error(mth, want, msg)
click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 191 def arity_error(mth, want, msg) "##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}" end
assert_arity_matches(mth)
click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 156 def assert_arity_matches(mth) # A bidi handler function can optionally be passed a second # call object parameter for access to metadata, cancelling, etc. if bidi_streamer? if mth.arity != 2 && mth.arity != 1 fail arity_error(mth, 2, "should be #{mth.name}(req, call) or " \ "#{mth.name}(req)") end elsif request_response? || server_streamer? if mth.arity != 2 fail arity_error(mth, 2, "should be #{mth.name}(req, call)") end else if mth.arity != 1 fail arity_error(mth, 1, "should be #{mth.name}(call)") end end end
bidi_streamer?()
click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 187 def bidi_streamer? input.is_a?(Stream) && output.is_a?(Stream) end
client_streamer?()
click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 179 def client_streamer? input.is_a?(Stream) && !output.is_a?(Stream) end
handle_bidi_streamer(active_call, mth, inter_ctx)
click to toggle source
@param [GRPC::ActiveCall] active_call @param [Method] mth @param [Array<GRPC::InterceptionContext>] inter_ctx
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 105 def handle_bidi_streamer(active_call, mth, inter_ctx) active_call.run_server_bidi(mth, inter_ctx) send_status(active_call, OK, 'OK', active_call.output_metadata) end
handle_client_streamer(active_call, mth, inter_ctx)
click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 68 def handle_client_streamer(active_call, mth, inter_ctx) call = active_call.multi_req_view inter_ctx.intercept!( :client_streamer, method: mth, call: call ) do resp = mth.call(call) active_call.server_unary_response( resp, trailing_metadata: active_call.output_metadata ) end end
handle_request_response(active_call, mth, inter_ctx)
click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 50 def handle_request_response(active_call, mth, inter_ctx) req = active_call.read_unary_request call = active_call.single_req_view inter_ctx.intercept!( :request_response, method: mth, call: call, request: req ) do resp = mth.call(req, call) active_call.server_unary_response( resp, trailing_metadata: active_call.output_metadata ) end end
handle_server_streamer(active_call, mth, inter_ctx)
click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 84 def handle_server_streamer(active_call, mth, inter_ctx) req = active_call.read_unary_request call = active_call.single_req_view inter_ctx.intercept!( :server_streamer, method: mth, call: call, request: req ) do replies = mth.call(req, call) replies.each { |r| active_call.remote_send(r) } send_status(active_call, OK, 'OK', active_call.output_metadata) end end
marshal_proc()
click to toggle source
@return [Proc] { |instance| marshalled(instance) }
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 34 def marshal_proc proc { |o| o.class.send(marshal_method, o).to_s } end
request_response?()
click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 175 def request_response? !input.is_a?(Stream) && !output.is_a?(Stream) end
run_server_method(active_call, mth, inter_ctx = InterceptionContext.new)
click to toggle source
@param [GRPC::ActiveCall] active_call The current active call object
for the request
@param [Method] mth The current RPC method being called @param [GRPC::InterceptionContext] inter_ctx The interception context
being executed
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 117 def run_server_method(active_call, mth, inter_ctx = InterceptionContext.new) # While a server method is running, it might be cancelled, its deadline # might be reached, the handler could throw an unknown error, or a # well-behaved handler could throw a StatusError. if request_response? handle_request_response(active_call, mth, inter_ctx) elsif client_streamer? handle_client_streamer(active_call, mth, inter_ctx) elsif server_streamer? handle_server_streamer(active_call, mth, inter_ctx) else # is a bidi_stream handle_bidi_streamer(active_call, mth, inter_ctx) end rescue BadStatus => e # this is raised by handlers that want GRPC to send an application error # code and detail message and some additional app-specific metadata. GRPC.logger.debug("app err:#{active_call}, status:#{e.code}:#{e.details}") send_status(active_call, e.code, e.details, e.metadata) rescue Core::CallError => e # This is raised by GRPC internals but should rarely, if ever happen. # Log it, but don't notify the other endpoint.. GRPC.logger.warn("failed call: #{active_call}\n#{e}") rescue Core::OutOfTime # This is raised when active_call#method.call exceeds the deadline # event. Send a status of deadline exceeded GRPC.logger.warn("late call: #{active_call}") send_status(active_call, DEADLINE_EXCEEDED, 'late') rescue StandardError, NotImplementedError => e # This will usuaally be an unhandled error in the handling code. # Send back a UNKNOWN status to the client # # Note: this intentionally does not map NotImplementedError to # UNIMPLEMENTED because NotImplementedError is intended for low-level # OS interaction (e.g. syscalls) not supported by the current OS. GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN") GRPC.logger.warn(e) send_status(active_call, UNKNOWN, "#{e.class}: #{e.message}") end
send_status(active_client, code, details, metadata = {})
click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 195 def send_status(active_client, code, details, metadata = {}) details = 'Not sure why' if details.nil? GRPC.logger.debug("Sending status #{code}:#{details}") active_client.send_status(code, details, code == OK, metadata: metadata) rescue StandardError => e GRPC.logger.warn("Could not send status #{code}:#{details}") GRPC.logger.warn(e) end
server_streamer?()
click to toggle source
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 183 def server_streamer? !input.is_a?(Stream) && output.is_a?(Stream) end
unmarshal_proc(target)
click to toggle source
@param [:input, :output] target determines whether to produce the an
unmarshal Proc for the rpc input parameter or its output parameter
@return [Proc] An unmarshal proc { |marshalled(instance)| instance }
# File src/ruby/lib/grpc/generic/rpc_desc.rb, line 43 def unmarshal_proc(target) fail ArgumentError unless [:input, :output].include?(target) unmarshal_class = send(target) unmarshal_class = unmarshal_class.type if unmarshal_class.is_a? Stream proc { |o| unmarshal_class.send(unmarshal_method, o) } end