diff --git a/.github/workflows/ruby.yml b/.github/workflows/ruby.yml index 7e6e776b..d1ce060d 100644 --- a/.github/workflows/ruby.yml +++ b/.github/workflows/ruby.yml @@ -28,6 +28,10 @@ jobs: timeout-minutes: 5 # Typically ends within 1-2 min steps: - uses: actions/checkout@v4 + - name: Install system dependencies + run: | + sudo apt-get update + sudo apt-get install -y libcurl4-openssl-dev - name: Set up Ruby uses: ruby/setup-ruby@v1 with: diff --git a/Gemfile b/Gemfile index 205723e7..1ed971b6 100644 --- a/Gemfile +++ b/Gemfile @@ -6,3 +6,7 @@ group :development, :test do gem "faraday", ENV.fetch("FARADAY_VERSION", "~> 2") gem "tiny-presto", "~> 0.0.10" end + +group :test do + gem "faraday-net_http_persistent" +end diff --git a/README.md b/README.md index 7a6dadbe..98fcf4ce 100644 --- a/README.md +++ b/README.md @@ -116,6 +116,7 @@ $ bundle exec rake modelgen:latest * **http_timeout** sets timeout in seconds to read data from a server. * **gzip** enables gzip compression. * **follow_redirect** enables HTTP redirection support. +* **faraday_adapter** sets the Faraday adapter to use. Default is `Faraday.default_adapter`. * **model_version** set the Trino version to which a job is submitted. Supported versions are 351, 316, 303, 0.205, 0.178, 0.173, 0.153 and 0.149. Default is 351. See [RDoc](http://www.rubydoc.info/gems/presto-client/) for the full documentation. diff --git a/lib/trino/client/client.rb b/lib/trino/client/client.rb index b5d48825..a8add3b5 100644 --- a/lib/trino/client/client.rb +++ b/lib/trino/client/client.rb @@ -21,10 +21,11 @@ module Trino::Client class Client def initialize(options) @options = options + @faraday = Trino::Client.faraday_client(options) end def query(query, &block) - q = Query.start(query, @options) + q = Query.start(query, @faraday, @options) if block begin yield q @@ -37,15 +38,15 @@ def query(query, &block) end def resume_query(next_uri) - return Query.resume(next_uri, @options) + return Query.resume(next_uri, @faraday, @options) end def kill(query_id) - return Query.kill(query_id, @options) + return Query.kill(query_id, @faraday, @options) end def run(query) - q = Query.start(query, @options) + q = Query.start(query, @faraday, @options) begin columns = q.columns if columns.empty? diff --git a/lib/trino/client/faraday_client.rb b/lib/trino/client/faraday_client.rb index 397049ab..c8f82d78 100644 --- a/lib/trino/client/faraday_client.rb +++ b/lib/trino/client/faraday_client.rb @@ -77,14 +77,6 @@ def self.faraday_client(options) faraday_options[:ssl] = ssl if ssl faraday = Faraday.new(faraday_options) do |faraday| - if options[:user] && options[:password] - # https://lostisland.github.io/faraday/middleware/authentication - if FARADAY1_USED - faraday.request(:basic_auth, options[:user], options[:password]) - else - faraday.request :authorization, :basic, options[:user], options[:password] - end - end if options[:follow_redirect] faraday.response :follow_redirects end @@ -92,11 +84,10 @@ def self.faraday_client(options) faraday.request :gzip end faraday.response :logger if options[:http_debug] - faraday.adapter Faraday.default_adapter + faraday.adapter(options[:faraday_adapter] || Faraday.default_adapter) end faraday.headers.merge!(HEADERS) - faraday.headers.merge!(optional_headers(options)) return faraday end @@ -129,71 +120,75 @@ def self.faraday_ssl_options(options) return ssl end - def self.optional_headers(options) - usePrestoHeader = false - if options[:model_version] && options[:model_version] < 351 - usePrestoHeader = true + def self.build_query_headers(options) + use_presto_headers = false + if options[:model_version] && options[:model_version].to_i < 351 + use_presto_headers = true end headers = {} + + if options[:user] && options[:password] + headers['Authorization'] = "Basic #{Base64.strict_encode64("#{options[:user]}:#{options[:password]}")}" + end if v = options[:user] - if usePrestoHeader + if use_presto_headers headers[PrestoHeaders::PRESTO_USER] = v else headers[TrinoHeaders::TRINO_USER] = v end end if v = options[:source] - if usePrestoHeader + if use_presto_headers headers[PrestoHeaders::PRESTO_SOURCE] = v else headers[TrinoHeaders::TRINO_SOURCE] = v end end if v = options[:catalog] - if usePrestoHeader + if use_presto_headers headers[PrestoHeaders::PRESTO_CATALOG] = v else headers[TrinoHeaders::TRINO_CATALOG] = v end end if v = options[:schema] - if usePrestoHeader + if use_presto_headers headers[PrestoHeaders::PRESTO_SCHEMA] = v else headers[TrinoHeaders::TRINO_SCHEMA] = v end end if v = options[:time_zone] - if usePrestoHeader + if use_presto_headers headers[PrestoHeaders::PRESTO_TIME_ZONE] = v else headers[TrinoHeaders::TRINO_TIME_ZONE] = v end end if v = options[:language] - if usePrestoHeader + if use_presto_headers headers[PrestoHeaders::PRESTO_LANGUAGE] = v else headers[TrinoHeaders::TRINO_LANGUAGE] = v end end if v = options[:properties] - if usePrestoHeader + if use_presto_headers headers[PrestoHeaders::PRESTO_SESSION] = encode_properties(v) else headers[TrinoHeaders::TRINO_SESSION] = encode_properties(v) end end if v = options[:client_info] - if usePrestoHeader + if use_presto_headers headers[PrestoHeaders::PRESTO_CLIENT_INFO] = encode_client_info(v) else headers[TrinoHeaders::TRINO_CLIENT_INFO] = encode_client_info(v) end end if v = options[:client_tags] - if usePrestoHeader + if use_presto_headers headers[PrestoHeaders::PRESTO_CLIENT_TAGS] = encode_client_tags(v) else headers[TrinoHeaders::TRINO_CLIENT_TAGS] = encode_client_tags(v) @@ -245,6 +240,5 @@ def self.encode_client_tags(tags) Array(tags).join(",") end - private_class_method :faraday_ssl_options, :optional_headers, :encode_properties, :encode_client_info, :encode_client_tags - + private_class_method :faraday_ssl_options, :encode_properties, :encode_client_info, :encode_client_tags end diff --git a/lib/trino/client/query.rb b/lib/trino/client/query.rb index 8b94f325..3d6cdfa0 100644 --- a/lib/trino/client/query.rb +++ b/lib/trino/client/query.rb @@ -25,26 +25,22 @@ module Trino::Client require 'trino/client/statement_client' class Query - def self.start(query, options) - new StatementClient.new(faraday_client(options), query, options) + def self.start(query, faraday, options) + new StatementClient.new(faraday, query, options) end - def self.resume(next_uri, options) - new StatementClient.new(faraday_client(options), nil, options, next_uri) + def self.resume(next_uri, faraday, options) + new StatementClient.new(faraday, nil, options, next_uri) end - def self.kill(query_id, options) - faraday = faraday_client(options) + def self.kill(query_id, faraday, options) response = faraday.delete do |req| + req.headers.merge!(Trino::Client.build_query_headers(options)) req.url "/v1/query/#{query_id}" end return response.status / 100 == 2 end - def self.faraday_client(options) - Trino::Client.faraday_client(options) - end - def self.transform_row(column_value_parsers, row) row_object = {} diff --git a/lib/trino/client/statement_client.rb b/lib/trino/client/statement_client.rb index b8ea2902..642efae6 100644 --- a/lib/trino/client/statement_client.rb +++ b/lib/trino/client/statement_client.rb @@ -28,6 +28,7 @@ class StatementClient def initialize(faraday, query, options, next_uri=nil) @faraday = faraday + @headers = Trino::Client.build_query_headers(options) @options = options @query = query @@ -68,6 +69,7 @@ def post_query_request! uri = "/v1/statement" response = @faraday.post do |req| req.url uri + req.headers.merge!(@headers) req.body = @query init_request(req) @@ -195,7 +197,9 @@ def faraday_get_with_retry(uri, &block) loop do begin - response = @faraday.get(uri) + response = @faraday.get(uri) do |req| + req.headers.merge!(@headers) + end rescue Faraday::TimeoutError, Faraday::ConnectionFailed # temporally error to retry response = nil @@ -258,6 +262,7 @@ def raise_timeout_error! def cancel_leaf_stage if uri = @results.partial_cancel_uri @faraday.delete do |req| + req.headers.merge!(@headers) req.url uri end end @@ -271,10 +276,11 @@ def close begin if uri = @results.next_uri @faraday.delete do |req| + req.headers.merge!(@headers) req.url uri end end - rescue => e + rescue end nil diff --git a/spec/client_spec.rb b/spec/client_spec.rb index 41196caa..ebab3282 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' describe Trino::Client::Client do - let(:client) { Trino::Client.new({}) } + let(:client) { Trino::Client.new({server: 'localhost:8080'}) } describe 'rehashes' do let(:columns) do @@ -66,7 +66,7 @@ # For this test, we'll use scalar_parser to add 2 to every integer query.scalar_parser = ->(data, type) { (type == 'integer') ? data + 2 : data } - columns, rows = client.run('fake query') + _columns, rows = client.run('fake query') transformed_rows = query.transform_rows expect(transformed_rows[0]).to eq({ diff --git a/spec/statement_client_spec.rb b/spec/statement_client_spec.rb index add752f4..8038efc2 100644 --- a/spec/statement_client_spec.rb +++ b/spec/statement_client_spec.rb @@ -1,5 +1,5 @@ require 'spec_helper' - +require 'faraday/net_http_persistent' describe Trino::Client::StatementClient do let :options do { @@ -346,7 +346,7 @@ it "forbids using basic auth when ssl is disabled" do expect do - Query.__send__(:faraday_client, { + Trino::Client.faraday_client({ server: 'localhost', password: 'abcd' }) @@ -354,16 +354,26 @@ end end + describe "faraday adapter" do + it "sets the adapter to the one specified in the options" do + f = Trino::Client.faraday_client({ + server: "localhost", + faraday_adapter: :net_http_persistent + }) + expect(f.adapter).to eq Faraday::Adapter::NetHttpPersistent + end + end + describe "ssl" do it "is disabled by default" do - f = Query.__send__(:faraday_client, { + f = Trino::Client.faraday_client({ server: "localhost", }) expect(f.url_prefix.to_s).to eq "http://localhost/" end it "is enabled with ssl: true" do - f = Query.__send__(:faraday_client, { + f = Trino::Client.faraday_client({ server: "localhost", ssl: true, }) @@ -372,7 +382,7 @@ end it "is enabled with ssl: {verify: false}" do - f = Query.__send__(:faraday_client, { + f = Trino::Client.faraday_client({ server: "localhost", ssl: {verify: false} }) @@ -382,7 +392,7 @@ it "rejects invalid ssl: verify: object" do expect do - f = Query.__send__(:faraday_client, { + Trino::Client.faraday_client({ server: "localhost", ssl: {verify: "??"} }) @@ -400,7 +410,7 @@ client_key: OpenSSL::PKey::DSA.new, } - f = Query.__send__(:faraday_client, { + f = Trino::Client.faraday_client({ server: "localhost", ssl: ssl, }) @@ -416,7 +426,7 @@ it "rejects an invalid string" do expect do - Query.__send__(:faraday_client, { + Trino::Client.faraday_client({ server: "localhost", ssl: '??', }) @@ -425,7 +435,7 @@ it "rejects an integer" do expect do - Query.__send__(:faraday_client, { + Trino::Client.faraday_client({ server: "localhost", ssl: 3, }) @@ -452,7 +462,7 @@ let :nested_json do nested_stats = {createTime: Time.now} # JSON max nesting default value is 100 - for i in 0..100 do + 100.times do nested_stats = {stats: nested_stats} end {