Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions parquet/file/file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"crypto/rand"
"encoding/binary"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"os"
"path"
"path/filepath"
"testing"

"github.com/apache/arrow-go/v18/arrow"
Expand Down Expand Up @@ -940,3 +942,149 @@ func TestListColumns(t *testing.T) {
}
}
}

func TestEncryptFile(t *testing.T) {
dir := "../../tools"
require.DirExists(t, dir)

footerKeyID := "footer_key"
footerKey := "0123456789012345"

enc := parquet.NewFileEncryptionProperties(footerKey, parquet.WithFooterKeyID(footerKeyID))
props := parquet.NewWriterProperties(
parquet.WithCompression(compress.Codecs.Uncompressed),
parquet.WithEncryptionProperties(enc),
)
filename := filepath.Join(dir, "arrowgo_encrypted_uniform.parquet")
outFile, err := os.Create(filename)
require.NoError(t, err)
require.NotNil(t, outFile)
defer outFile.Close()

fields := make(schema.FieldList, 0)
// {"id": [1, 2, 3], "name": ["alice", "bob", "charlie"]}
fields = append(fields,
schema.NewInt32Node("id", parquet.Repetitions.Required, -1),
schema.NewByteArrayNode("name", parquet.Repetitions.Required, -1),
)
schema, err := schema.NewGroupNode("schema", parquet.Repetitions.Required, fields, -1)
require.NoError(t, err)

writer := file.NewParquetWriter(outFile, schema, file.WithWriterProps(props))
defer writer.Close()

rgr := writer.AppendBufferedRowGroup()
defer rgr.Close()

ccwID, err := rgr.Column(0)
require.NoError(t, err)
ccwName, err := rgr.Column(1)
require.NoError(t, err)

ids := []int32{1, 2, 3}
names := []parquet.ByteArray{
[]byte("alice"),
[]byte("bob"),
[]byte("charlie"),
}

_, err = ccwID.(*file.Int32ColumnChunkWriter).WriteBatch(ids, nil, nil)
require.NoError(t, err)
_, err = ccwName.(*file.ByteArrayColumnChunkWriter).WriteBatch(names, nil, nil)
require.NoError(t, err)
}

type keyIDRetriever map[string]string

// PutKey adds a key with the given string ID that can be retrieved
func (s keyIDRetriever) PutKey(keyID, key string) {
s[keyID] = key
}

// GetKey expects the keymetadata to match one of the keys that were added
// with PutKey and panics if the key cannot be found.
func (s keyIDRetriever) GetKey(keyMetadata []byte) string {
var mdMap map[string]any
err := json.Unmarshal(keyMetadata, &mdMap)
if err != nil {
panic(fmt.Errorf("parquet: invalid key metadata: %w", err))
}
keyMetadataStr, ok := mdMap["masterKeyID"].(string)
if !ok {
panic(fmt.Errorf("parquet: masterKeyID missing from key metadata"))
}
k, ok := s[keyMetadataStr]
if !ok {
panic(fmt.Errorf("parquet: key missing for id %s", keyMetadata))
}
return k
}

func TestDecryptFile(t *testing.T) {
dir := "../../tools"
require.DirExists(t, dir)

footerKeyID := "footer_key"
footerKey := "0123456789012345"

stringKr := make(keyIDRetriever)
stringKr.PutKey(footerKeyID, footerKey)
decryptProps := parquet.NewFileDecryptionProperties(parquet.WithKeyRetriever(stringKr))
// decryptProps := parquet.NewFileDecryptionProperties(parquet.WithFooterKey(footerKey))

props := parquet.NewReaderProperties(memory.DefaultAllocator)
props.FileDecryptProps = decryptProps.Clone("")

fileReader, err := file.OpenParquetFile(path.Join(dir, "pyarrow_encrypted_uniform.parquet"), false, file.WithReadProps(props))
// fileReader, err := file.OpenParquetFile(path.Join(dir, "arrowgo_encrypted_uniform.parquet"), false, file.WithReadProps(props))
require.NoError(t, err)
defer fileReader.Close()

// get metadata
fileMetadata := fileReader.MetaData()
// get number of rowgroups
numRowGroups := len(fileMetadata.RowGroups)
// number of columns
numColumns := fileMetadata.Schema.NumColumns()
assert.Equal(t, 2, numColumns)

for r := range numRowGroups {
rowGroupReader := fileReader.RowGroup(r)
numRows := rowGroupReader.NumRows()
colReader, err := rowGroupReader.Column(0)
require.NoError(t, err)
int32ColReader, ok := colReader.(*file.Int32ColumnChunkReader)
require.True(t, ok)

values := make([]int32, numRows)
defLvls := make([]int16, numRows)
repLvls := make([]int16, numRows)

total, read, err := int32ColReader.ReadBatch(numRows, values, defLvls, repLvls)
require.NoError(t, err)
require.Equal(t, numRows, int64(total))
require.Equal(t, numRows, int64(read))
expected := []int32{1, 2, 3}
require.Equal(t, expected, values)

colReader, err = rowGroupReader.Column(1)
require.NoError(t, err)
byteArrayColReader, ok := colReader.(*file.ByteArrayColumnChunkReader)
require.True(t, ok)

byteArrayValues := make([]parquet.ByteArray, numRows)
defLvls = make([]int16, numRows)
repLvls = make([]int16, numRows)

total, read, err = byteArrayColReader.ReadBatch(numRows, byteArrayValues, defLvls, repLvls)
require.NoError(t, err)
require.Equal(t, numRows, int64(total))
require.Equal(t, numRows, int64(read))
expectedByteArray := []parquet.ByteArray{
[]byte("alice"),
[]byte("bob"),
[]byte("charlie"),
}
require.Equal(t, expectedByteArray, byteArrayValues)
}
}
Binary file added tools/arrowgo_encrypted_uniform.parquet
Binary file not shown.
Binary file added tools/pyarrow_encrypted_uniform.parquet
Binary file not shown.
43 changes: 43 additions & 0 deletions tools/read_encrypted_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import base64
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.parquet.encryption as pe

KEY_ID = "footer_key"

class MockKmsClient(pe.KmsClient):
def __init__(self, kms_connection_configuration):
super().__init__()

def wrap_key(self, key_bytes, master_key_identifier):
return base64.b64encode(key_bytes)

def unwrap_key(self, wrapped_key, master_key_identifier):
return base64.b64decode(wrapped_key)

crypto_factory = pe.CryptoFactory(lambda config: MockKmsClient(config))
kms_connection_config = pe.KmsConnectionConfig()
decryption_config = pe.DecryptionConfiguration()
decryption_properties = crypto_factory.file_decryption_properties(kms_connection_config, decryption_config)

# Read back stats
# input_file = "pyarrow_encrypted_uniform.parquet"
input_file = "arrowgo_encrypted_uniform.parquet"
print(f"\nReading {input_file}")
with pq.ParquetFile(
input_file,
decryption_properties=decryption_properties) as f:
# meta data
rg = f.metadata.row_group(0)
# per-column stats
for col_idx in range(f.metadata.num_columns):
col = rg.column(col_idx)
statistics = "None" if col.statistics is None else str(col.statistics)
print(f"Column '{col.path_in_schema}' statistics:\n {statistics}")

# read the table
table = f.read()
print("\nDecrypted table:")
print(table)

print(f"\nSuccessfully read and decrypted {input_file}")
38 changes: 38 additions & 0 deletions tools/write_encrypted_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import base64
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.parquet.encryption as pe

KEY_ID = "footer_key"

class MockKmsClient(pe.KmsClient):
def __init__(self, kms_connection_configuration):
super().__init__()

def wrap_key(self, key_bytes, master_key_identifier):
return base64.b64encode(key_bytes)

def unwrap_key(self, wrapped_key, master_key_identifier):
return base64.b64decode(wrapped_key)

# ── Write the file ───────────────────────────────────────────────────────
table = pa.table({"id": [1, 2, 3], "name": ["alice", "bob", "charlie"]})

crypto_factory = pe.CryptoFactory(lambda config: MockKmsClient(config))
kms_connection_config = pe.KmsConnectionConfig()
encryption_config = pe.EncryptionConfiguration(
KEY_ID,
uniform_encryption=True,
plaintext_footer=False)

enc_props = crypto_factory.file_encryption_properties(
kms_connection_config,
encryption_config
)

output_file = "pyarrow_encrypted_uniform.parquet"
pq.write_table(table, output_file, compression="none", encryption_properties=enc_props)

print(f"Successfully written {output_file}")
print(f" Key ID: {KEY_ID}")
print(" → This file MUST be readable by any correct arrow-go implementation")
Loading