Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ruby.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ jobs:
timeout-minutes: 5 # Typically ends within 1-2 min
steps:
- uses: actions/checkout@v4
- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y libcurl4-openssl-dev
- name: Set up Ruby
uses: ruby/setup-ruby@v1
with:
Expand Down
4 changes: 4 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ group :development, :test do
gem "faraday", ENV.fetch("FARADAY_VERSION", "~> 2")
gem "tiny-presto", "~> 0.0.10"
end

group :test do
gem "faraday-net_http_persistent"
end
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ $ bundle exec rake modelgen:latest
* **http_timeout** sets timeout in seconds to read data from a server.
* **gzip** enables gzip compression.
* **follow_redirect** enables HTTP redirection support.
* **faraday_adapter** sets the Faraday adapter to use. Default is `Faraday.default_adapter`.
* **model_version** set the Trino version to which a job is submitted. Supported versions are 351, 316, 303, 0.205, 0.178, 0.173, 0.153 and 0.149. Default is 351.

See [RDoc](http://www.rubydoc.info/gems/presto-client/) for the full documentation.
Expand Down
9 changes: 5 additions & 4 deletions lib/trino/client/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ module Trino::Client
class Client
def initialize(options)
@options = options
@faraday = Trino::Client.faraday_client(options)
end

def query(query, &block)
q = Query.start(query, @options)
q = Query.start(query, @faraday, @options)
if block
begin
yield q
Expand All @@ -37,15 +38,15 @@ def query(query, &block)
end

def resume_query(next_uri)
return Query.resume(next_uri, @options)
return Query.resume(next_uri, @faraday, @options)
end

def kill(query_id)
return Query.kill(query_id, @options)
return Query.kill(query_id, @faraday, @options)
end

def run(query)
q = Query.start(query, @options)
q = Query.start(query, @faraday, @options)
begin
columns = q.columns
if columns.empty?
Expand Down
44 changes: 19 additions & 25 deletions lib/trino/client/faraday_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,26 +77,17 @@ def self.faraday_client(options)
faraday_options[:ssl] = ssl if ssl

faraday = Faraday.new(faraday_options) do |faraday|
if options[:user] && options[:password]
# https://lostisland.github.io/faraday/middleware/authentication
if FARADAY1_USED
faraday.request(:basic_auth, options[:user], options[:password])
else
faraday.request :authorization, :basic, options[:user], options[:password]
end
end
if options[:follow_redirect]
faraday.response :follow_redirects
end
if options[:gzip]
faraday.request :gzip
end
faraday.response :logger if options[:http_debug]
faraday.adapter Faraday.default_adapter
faraday.adapter(options[:faraday_adapter] || Faraday.default_adapter)
end

faraday.headers.merge!(HEADERS)
faraday.headers.merge!(optional_headers(options))

return faraday
end
Expand Down Expand Up @@ -129,71 +120,75 @@ def self.faraday_ssl_options(options)
return ssl
end

def self.optional_headers(options)
usePrestoHeader = false
if options[:model_version] && options[:model_version] < 351
usePrestoHeader = true
def self.build_query_headers(options)
use_presto_headers = false
if options[:model_version] && options[:model_version].to_i < 351
use_presto_headers = true
end

headers = {}

if options[:user] && options[:password]
headers['Authorization'] = "Basic #{Base64.strict_encode64("#{options[:user]}:#{options[:password]}")}"
end
if v = options[:user]
if usePrestoHeader
if use_presto_headers
headers[PrestoHeaders::PRESTO_USER] = v
else
headers[TrinoHeaders::TRINO_USER] = v
end
end
if v = options[:source]
if usePrestoHeader
if use_presto_headers
headers[PrestoHeaders::PRESTO_SOURCE] = v
else
headers[TrinoHeaders::TRINO_SOURCE] = v
end
end
if v = options[:catalog]
if usePrestoHeader
if use_presto_headers
headers[PrestoHeaders::PRESTO_CATALOG] = v
else
headers[TrinoHeaders::TRINO_CATALOG] = v
end
end
if v = options[:schema]
if usePrestoHeader
if use_presto_headers
headers[PrestoHeaders::PRESTO_SCHEMA] = v
else
headers[TrinoHeaders::TRINO_SCHEMA] = v
end
end
if v = options[:time_zone]
if usePrestoHeader
if use_presto_headers
headers[PrestoHeaders::PRESTO_TIME_ZONE] = v
else
headers[TrinoHeaders::TRINO_TIME_ZONE] = v
end
end
if v = options[:language]
if usePrestoHeader
if use_presto_headers
headers[PrestoHeaders::PRESTO_LANGUAGE] = v
else
headers[TrinoHeaders::TRINO_LANGUAGE] = v
end
end
if v = options[:properties]
if usePrestoHeader
if use_presto_headers
headers[PrestoHeaders::PRESTO_SESSION] = encode_properties(v)
else
headers[TrinoHeaders::TRINO_SESSION] = encode_properties(v)
end
end
if v = options[:client_info]
if usePrestoHeader
if use_presto_headers
headers[PrestoHeaders::PRESTO_CLIENT_INFO] = encode_client_info(v)
else
headers[TrinoHeaders::TRINO_CLIENT_INFO] = encode_client_info(v)
end
end
if v = options[:client_tags]
if usePrestoHeader
if use_presto_headers
headers[PrestoHeaders::PRESTO_CLIENT_TAGS] = encode_client_tags(v)
else
headers[TrinoHeaders::TRINO_CLIENT_TAGS] = encode_client_tags(v)
Expand Down Expand Up @@ -245,6 +240,5 @@ def self.encode_client_tags(tags)
Array(tags).join(",")
end

private_class_method :faraday_ssl_options, :optional_headers, :encode_properties, :encode_client_info, :encode_client_tags

private_class_method :faraday_ssl_options, :encode_properties, :encode_client_info, :encode_client_tags
end
16 changes: 6 additions & 10 deletions lib/trino/client/query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,22 @@ module Trino::Client
require 'trino/client/statement_client'

class Query
def self.start(query, options)
new StatementClient.new(faraday_client(options), query, options)
def self.start(query, faraday, options)
new StatementClient.new(faraday, query, options)
end

def self.resume(next_uri, options)
new StatementClient.new(faraday_client(options), nil, options, next_uri)
def self.resume(next_uri, faraday, options)
new StatementClient.new(faraday, nil, options, next_uri)
end

def self.kill(query_id, options)
faraday = faraday_client(options)
def self.kill(query_id, faraday, options)
response = faraday.delete do |req|
req.headers.merge!(Trino::Client.build_query_headers(options))
req.url "/v1/query/#{query_id}"
end
return response.status / 100 == 2
end

def self.faraday_client(options)
Trino::Client.faraday_client(options)
end

def self.transform_row(column_value_parsers, row)
row_object = {}

Expand Down
10 changes: 8 additions & 2 deletions lib/trino/client/statement_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class StatementClient

def initialize(faraday, query, options, next_uri=nil)
@faraday = faraday
@headers = Trino::Client.build_query_headers(options)

@options = options
@query = query
Expand Down Expand Up @@ -68,6 +69,7 @@ def post_query_request!
uri = "/v1/statement"
response = @faraday.post do |req|
req.url uri
req.headers.merge!(@headers)

req.body = @query
init_request(req)
Expand Down Expand Up @@ -195,7 +197,9 @@ def faraday_get_with_retry(uri, &block)

loop do
begin
response = @faraday.get(uri)
response = @faraday.get(uri) do |req|
req.headers.merge!(@headers)
end
rescue Faraday::TimeoutError, Faraday::ConnectionFailed
# temporally error to retry
response = nil
Expand Down Expand Up @@ -258,6 +262,7 @@ def raise_timeout_error!
def cancel_leaf_stage
if uri = @results.partial_cancel_uri
@faraday.delete do |req|
req.headers.merge!(@headers)
req.url uri
end
end
Expand All @@ -271,10 +276,11 @@ def close
begin
if uri = @results.next_uri
@faraday.delete do |req|
req.headers.merge!(@headers)
req.url uri
end
end
rescue => e
rescue
end

nil
Expand Down
4 changes: 2 additions & 2 deletions spec/client_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require 'spec_helper'

describe Trino::Client::Client do
let(:client) { Trino::Client.new({}) }
let(:client) { Trino::Client.new({server: 'localhost:8080'}) }

describe 'rehashes' do
let(:columns) do
Expand Down Expand Up @@ -66,7 +66,7 @@
# For this test, we'll use scalar_parser to add 2 to every integer
query.scalar_parser = ->(data, type) { (type == 'integer') ? data + 2 : data }

columns, rows = client.run('fake query')
_columns, rows = client.run('fake query')
transformed_rows = query.transform_rows

expect(transformed_rows[0]).to eq({
Expand Down
30 changes: 20 additions & 10 deletions spec/statement_client_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
require 'spec_helper'

require 'faraday/net_http_persistent'
describe Trino::Client::StatementClient do
let :options do
{
Expand Down Expand Up @@ -346,24 +346,34 @@

it "forbids using basic auth when ssl is disabled" do
expect do
Query.__send__(:faraday_client, {
Trino::Client.faraday_client({
server: 'localhost',
password: 'abcd'
})
end.to raise_error(ArgumentError)
end
end

describe "faraday adapter" do
it "sets the adapter to the one specified in the options" do
f = Trino::Client.faraday_client({
server: "localhost",
faraday_adapter: :net_http_persistent
})
expect(f.adapter).to eq Faraday::Adapter::NetHttpPersistent
end
end

describe "ssl" do
it "is disabled by default" do
f = Query.__send__(:faraday_client, {
f = Trino::Client.faraday_client({
server: "localhost",
})
expect(f.url_prefix.to_s).to eq "http://localhost/"
end

it "is enabled with ssl: true" do
f = Query.__send__(:faraday_client, {
f = Trino::Client.faraday_client({
server: "localhost",
ssl: true,
})
Expand All @@ -372,7 +382,7 @@
end

it "is enabled with ssl: {verify: false}" do
f = Query.__send__(:faraday_client, {
f = Trino::Client.faraday_client({
server: "localhost",
ssl: {verify: false}
})
Expand All @@ -382,7 +392,7 @@

it "rejects invalid ssl: verify: object" do
expect do
f = Query.__send__(:faraday_client, {
Trino::Client.faraday_client({
server: "localhost",
ssl: {verify: "??"}
})
Expand All @@ -400,7 +410,7 @@
client_key: OpenSSL::PKey::DSA.new,
}

f = Query.__send__(:faraday_client, {
f = Trino::Client.faraday_client({
server: "localhost",
ssl: ssl,
})
Expand All @@ -416,7 +426,7 @@

it "rejects an invalid string" do
expect do
Query.__send__(:faraday_client, {
Trino::Client.faraday_client({
server: "localhost",
ssl: '??',
})
Expand All @@ -425,7 +435,7 @@

it "rejects an integer" do
expect do
Query.__send__(:faraday_client, {
Trino::Client.faraday_client({
server: "localhost",
ssl: 3,
})
Expand All @@ -452,7 +462,7 @@
let :nested_json do
nested_stats = {createTime: Time.now}
# JSON max nesting default value is 100
for i in 0..100 do
100.times do
nested_stats = {stats: nested_stats}
end
{
Expand Down
Loading