From 915a2389eefc7843ade1cbbdf19b2f19e2d65e71 Mon Sep 17 00:00:00 2001 From: Sylvain Utard Date: Tue, 22 Jul 2025 16:58:55 +0200 Subject: [PATCH 1/5] Ensure a single Faraday client is used per Client For now, each `start`, `resume` or `kill` query would instantiate a new `Faraday` client by calling `Trino::Client.faraday_client` at the query level. This PR refactors the logic of the `Trino::Client` to ensure a single `Faraday` client is used per instance of `Trino::Client` and set all the query (and user) specific HTTP headers at query time. This avoids the need to reconnect to the Trino server dealing with all the SSL handshake processing time at each query + allow the usage of [faraday-net_http_persistent](https://github.com/lostisland/faraday-net_http_persistent) to benefit from HTTP keep-alive. While the benefit of saving a few ms at each Trino statement query might not be that visible for the "long running queries" use-cases (where the actual Trino query anyway takes up to a few seconds to answer), we've seen large improvements for use-cases where the queries are taking a few dozens of ms. --- lib/trino/client/client.rb | 9 +++--- lib/trino/client/faraday_client.rb | 42 ++++++++++++---------------- lib/trino/client/query.rb | 16 ++++------- lib/trino/client/statement_client.rb | 10 +++++-- spec/client_spec.rb | 4 +-- spec/statement_client_spec.rb | 18 ++++++------ 6 files changed, 48 insertions(+), 51 deletions(-) diff --git a/lib/trino/client/client.rb b/lib/trino/client/client.rb index b5d48825..a8add3b5 100644 --- a/lib/trino/client/client.rb +++ b/lib/trino/client/client.rb @@ -21,10 +21,11 @@ module Trino::Client class Client def initialize(options) @options = options + @faraday = Trino::Client.faraday_client(options) end def query(query, &block) - q = Query.start(query, @options) + q = Query.start(query, @faraday, @options) if block begin yield q @@ -37,15 +38,15 @@ def query(query, &block) end def resume_query(next_uri) - return Query.resume(next_uri, @options) + return Query.resume(next_uri, @faraday, @options) end def kill(query_id) - return Query.kill(query_id, @options) + return Query.kill(query_id, @faraday, @options) end def run(query) - q = Query.start(query, @options) + q = Query.start(query, @faraday, @options) begin columns = q.columns if columns.empty? diff --git a/lib/trino/client/faraday_client.rb b/lib/trino/client/faraday_client.rb index 397049ab..24c839b9 100644 --- a/lib/trino/client/faraday_client.rb +++ b/lib/trino/client/faraday_client.rb @@ -77,14 +77,6 @@ def self.faraday_client(options) faraday_options[:ssl] = ssl if ssl faraday = Faraday.new(faraday_options) do |faraday| - if options[:user] && options[:password] - # https://lostisland.github.io/faraday/middleware/authentication - if FARADAY1_USED - faraday.request(:basic_auth, options[:user], options[:password]) - else - faraday.request :authorization, :basic, options[:user], options[:password] - end - end if options[:follow_redirect] faraday.response :follow_redirects end @@ -96,7 +88,6 @@ def self.faraday_client(options) end faraday.headers.merge!(HEADERS) - faraday.headers.merge!(optional_headers(options)) return faraday end @@ -129,71 +120,75 @@ def self.faraday_ssl_options(options) return ssl end - def self.optional_headers(options) - usePrestoHeader = false - if options[:model_version] && options[:model_version] < 351 - usePrestoHeader = true + def self.build_query_headers(options) + use_presto_headers = false + if options[:model_version] && options[:model_version].to_i < 351 + use_presto_headers = true end headers = {} + + if options[:user] && options[:password] + headers['Authorization'] = "Basic #{Base64.strict_encode64("#{options[:user]}:#{options[:password]}")}" + end if v = options[:user] - if usePrestoHeader + if use_presto_headers headers[PrestoHeaders::PRESTO_USER] = v else headers[TrinoHeaders::TRINO_USER] = v end end if v = options[:source] - if usePrestoHeader + if use_presto_headers headers[PrestoHeaders::PRESTO_SOURCE] = v else headers[TrinoHeaders::TRINO_SOURCE] = v end end if v = options[:catalog] - if usePrestoHeader + if use_presto_headers headers[PrestoHeaders::PRESTO_CATALOG] = v else headers[TrinoHeaders::TRINO_CATALOG] = v end end if v = options[:schema] - if usePrestoHeader + if use_presto_headers headers[PrestoHeaders::PRESTO_SCHEMA] = v else headers[TrinoHeaders::TRINO_SCHEMA] = v end end if v = options[:time_zone] - if usePrestoHeader + if use_presto_headers headers[PrestoHeaders::PRESTO_TIME_ZONE] = v else headers[TrinoHeaders::TRINO_TIME_ZONE] = v end end if v = options[:language] - if usePrestoHeader + if use_presto_headers headers[PrestoHeaders::PRESTO_LANGUAGE] = v else headers[TrinoHeaders::TRINO_LANGUAGE] = v end end if v = options[:properties] - if usePrestoHeader + if use_presto_headers headers[PrestoHeaders::PRESTO_SESSION] = encode_properties(v) else headers[TrinoHeaders::TRINO_SESSION] = encode_properties(v) end end if v = options[:client_info] - if usePrestoHeader + if use_presto_headers headers[PrestoHeaders::PRESTO_CLIENT_INFO] = encode_client_info(v) else headers[TrinoHeaders::TRINO_CLIENT_INFO] = encode_client_info(v) end end if v = options[:client_tags] - if usePrestoHeader + if use_presto_headers headers[PrestoHeaders::PRESTO_CLIENT_TAGS] = encode_client_tags(v) else headers[TrinoHeaders::TRINO_CLIENT_TAGS] = encode_client_tags(v) @@ -245,6 +240,5 @@ def self.encode_client_tags(tags) Array(tags).join(",") end - private_class_method :faraday_ssl_options, :optional_headers, :encode_properties, :encode_client_info, :encode_client_tags - + private_class_method :faraday_ssl_options, :encode_properties, :encode_client_info, :encode_client_tags end diff --git a/lib/trino/client/query.rb b/lib/trino/client/query.rb index 8b94f325..3d6cdfa0 100644 --- a/lib/trino/client/query.rb +++ b/lib/trino/client/query.rb @@ -25,26 +25,22 @@ module Trino::Client require 'trino/client/statement_client' class Query - def self.start(query, options) - new StatementClient.new(faraday_client(options), query, options) + def self.start(query, faraday, options) + new StatementClient.new(faraday, query, options) end - def self.resume(next_uri, options) - new StatementClient.new(faraday_client(options), nil, options, next_uri) + def self.resume(next_uri, faraday, options) + new StatementClient.new(faraday, nil, options, next_uri) end - def self.kill(query_id, options) - faraday = faraday_client(options) + def self.kill(query_id, faraday, options) response = faraday.delete do |req| + req.headers.merge!(Trino::Client.build_query_headers(options)) req.url "/v1/query/#{query_id}" end return response.status / 100 == 2 end - def self.faraday_client(options) - Trino::Client.faraday_client(options) - end - def self.transform_row(column_value_parsers, row) row_object = {} diff --git a/lib/trino/client/statement_client.rb b/lib/trino/client/statement_client.rb index b8ea2902..642efae6 100644 --- a/lib/trino/client/statement_client.rb +++ b/lib/trino/client/statement_client.rb @@ -28,6 +28,7 @@ class StatementClient def initialize(faraday, query, options, next_uri=nil) @faraday = faraday + @headers = Trino::Client.build_query_headers(options) @options = options @query = query @@ -68,6 +69,7 @@ def post_query_request! uri = "/v1/statement" response = @faraday.post do |req| req.url uri + req.headers.merge!(@headers) req.body = @query init_request(req) @@ -195,7 +197,9 @@ def faraday_get_with_retry(uri, &block) loop do begin - response = @faraday.get(uri) + response = @faraday.get(uri) do |req| + req.headers.merge!(@headers) + end rescue Faraday::TimeoutError, Faraday::ConnectionFailed # temporally error to retry response = nil @@ -258,6 +262,7 @@ def raise_timeout_error! def cancel_leaf_stage if uri = @results.partial_cancel_uri @faraday.delete do |req| + req.headers.merge!(@headers) req.url uri end end @@ -271,10 +276,11 @@ def close begin if uri = @results.next_uri @faraday.delete do |req| + req.headers.merge!(@headers) req.url uri end end - rescue => e + rescue end nil diff --git a/spec/client_spec.rb b/spec/client_spec.rb index 41196caa..5185c5fa 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' describe Trino::Client::Client do - let(:client) { Trino::Client.new({}) } + let(:client) { Trino::Client.new({ server: 'localhost:8080' }) } describe 'rehashes' do let(:columns) do @@ -66,7 +66,7 @@ # For this test, we'll use scalar_parser to add 2 to every integer query.scalar_parser = ->(data, type) { (type == 'integer') ? data + 2 : data } - columns, rows = client.run('fake query') + _columns, rows = client.run('fake query') transformed_rows = query.transform_rows expect(transformed_rows[0]).to eq({ diff --git a/spec/statement_client_spec.rb b/spec/statement_client_spec.rb index add752f4..fbb84d6f 100644 --- a/spec/statement_client_spec.rb +++ b/spec/statement_client_spec.rb @@ -346,7 +346,7 @@ it "forbids using basic auth when ssl is disabled" do expect do - Query.__send__(:faraday_client, { + Trino::Client.faraday_client({ server: 'localhost', password: 'abcd' }) @@ -356,14 +356,14 @@ describe "ssl" do it "is disabled by default" do - f = Query.__send__(:faraday_client, { + f = Trino::Client.faraday_client({ server: "localhost", }) expect(f.url_prefix.to_s).to eq "http://localhost/" end it "is enabled with ssl: true" do - f = Query.__send__(:faraday_client, { + f = Trino::Client.faraday_client({ server: "localhost", ssl: true, }) @@ -372,7 +372,7 @@ end it "is enabled with ssl: {verify: false}" do - f = Query.__send__(:faraday_client, { + f = Trino::Client.faraday_client({ server: "localhost", ssl: {verify: false} }) @@ -382,7 +382,7 @@ it "rejects invalid ssl: verify: object" do expect do - f = Query.__send__(:faraday_client, { + Trino::Client.faraday_client({ server: "localhost", ssl: {verify: "??"} }) @@ -400,7 +400,7 @@ client_key: OpenSSL::PKey::DSA.new, } - f = Query.__send__(:faraday_client, { + f = Trino::Client.faraday_client({ server: "localhost", ssl: ssl, }) @@ -416,7 +416,7 @@ it "rejects an invalid string" do expect do - Query.__send__(:faraday_client, { + Trino::Client.faraday_client({ server: "localhost", ssl: '??', }) @@ -425,7 +425,7 @@ it "rejects an integer" do expect do - Query.__send__(:faraday_client, { + Trino::Client.faraday_client({ server: "localhost", ssl: 3, }) @@ -452,7 +452,7 @@ let :nested_json do nested_stats = {createTime: Time.now} # JSON max nesting default value is 100 - for i in 0..100 do + 100.times do nested_stats = {stats: nested_stats} end { From 58561edb9bc053ae36bc61c587d4dd48de7cce7a Mon Sep 17 00:00:00 2001 From: Sylvain Utard Date: Wed, 23 Jul 2025 17:47:03 +0200 Subject: [PATCH 2/5] Also add the faraday_adapter option --- README.md | 1 + lib/trino/client/faraday_client.rb | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 7a6dadbe..98fcf4ce 100644 --- a/README.md +++ b/README.md @@ -116,6 +116,7 @@ $ bundle exec rake modelgen:latest * **http_timeout** sets timeout in seconds to read data from a server. * **gzip** enables gzip compression. * **follow_redirect** enables HTTP redirection support. +* **faraday_adapter** sets the Faraday adapter to use. Default is `Faraday.default_adapter`. * **model_version** set the Trino version to which a job is submitted. Supported versions are 351, 316, 303, 0.205, 0.178, 0.173, 0.153 and 0.149. Default is 351. See [RDoc](http://www.rubydoc.info/gems/presto-client/) for the full documentation. diff --git a/lib/trino/client/faraday_client.rb b/lib/trino/client/faraday_client.rb index 24c839b9..c8f82d78 100644 --- a/lib/trino/client/faraday_client.rb +++ b/lib/trino/client/faraday_client.rb @@ -84,7 +84,7 @@ def self.faraday_client(options) faraday.request :gzip end faraday.response :logger if options[:http_debug] - faraday.adapter Faraday.default_adapter + faraday.adapter(options[:faraday_adapter] || Faraday.default_adapter) end faraday.headers.merge!(HEADERS) From d64e7e76d5e59b682ec37ac7d5f2cbd59e7d3792 Mon Sep 17 00:00:00 2001 From: Sylvain Utard Date: Wed, 23 Jul 2025 17:59:16 +0200 Subject: [PATCH 3/5] add tests --- Gemfile | 4 ++++ spec/statement_client_spec.rb | 12 +++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/Gemfile b/Gemfile index 205723e7..59c0883d 100644 --- a/Gemfile +++ b/Gemfile @@ -6,3 +6,7 @@ group :development, :test do gem "faraday", ENV.fetch("FARADAY_VERSION", "~> 2") gem "tiny-presto", "~> 0.0.10" end + +group :test do + gem "faraday-net_http_persistent", "~> 2.0" +end \ No newline at end of file diff --git a/spec/statement_client_spec.rb b/spec/statement_client_spec.rb index fbb84d6f..8038efc2 100644 --- a/spec/statement_client_spec.rb +++ b/spec/statement_client_spec.rb @@ -1,5 +1,5 @@ require 'spec_helper' - +require 'faraday/net_http_persistent' describe Trino::Client::StatementClient do let :options do { @@ -354,6 +354,16 @@ end end + describe "faraday adapter" do + it "sets the adapter to the one specified in the options" do + f = Trino::Client.faraday_client({ + server: "localhost", + faraday_adapter: :net_http_persistent + }) + expect(f.adapter).to eq Faraday::Adapter::NetHttpPersistent + end + end + describe "ssl" do it "is disabled by default" do f = Trino::Client.faraday_client({ From 329eeaf50c90f945b4bb13050ed75adb94516af6 Mon Sep 17 00:00:00 2001 From: Sylvain Utard Date: Mon, 4 Aug 2025 15:35:33 +0200 Subject: [PATCH 4/5] Try install the libcurl4-openssl-dev first --- .github/workflows/ruby.yml | 4 ++++ Gemfile | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ruby.yml b/.github/workflows/ruby.yml index 7e6e776b..d1ce060d 100644 --- a/.github/workflows/ruby.yml +++ b/.github/workflows/ruby.yml @@ -28,6 +28,10 @@ jobs: timeout-minutes: 5 # Typically ends within 1-2 min steps: - uses: actions/checkout@v4 + - name: Install system dependencies + run: | + sudo apt-get update + sudo apt-get install -y libcurl4-openssl-dev - name: Set up Ruby uses: ruby/setup-ruby@v1 with: diff --git a/Gemfile b/Gemfile index 59c0883d..1ed971b6 100644 --- a/Gemfile +++ b/Gemfile @@ -8,5 +8,5 @@ group :development, :test do end group :test do - gem "faraday-net_http_persistent", "~> 2.0" -end \ No newline at end of file + gem "faraday-net_http_persistent" +end From c80fd0132eb1f4a290c6e2eca5e715282a87eeef Mon Sep 17 00:00:00 2001 From: Sylvain Utard Date: Tue, 5 Aug 2025 14:24:21 +0200 Subject: [PATCH 5/5] fix coding style --- spec/client_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/client_spec.rb b/spec/client_spec.rb index 5185c5fa..ebab3282 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' describe Trino::Client::Client do - let(:client) { Trino::Client.new({ server: 'localhost:8080' }) } + let(:client) { Trino::Client.new({server: 'localhost:8080'}) } describe 'rehashes' do let(:columns) do