Skip to content

Commit 585302a

Browse files
authored
perf: Optimize file writing and writing (#4512)
1 parent d393e28 commit 585302a

File tree

1 file changed

+69
-23
lines changed

1 file changed

+69
-23
lines changed

apps/knowledge/models/knowledge.py

Lines changed: 69 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -342,39 +342,85 @@ class Meta:
342342
db_table = "file"
343343

344344
def save(self, bytea=None, force_insert=False, force_update=False, using=None, update_fields=None):
345+
if bytea is None:
346+
raise ValueError("bytea参数不能为空")
347+
345348
sha256_hash = get_sha256_hash(bytea)
346-
# 创建压缩文件
347-
zip_buffer = io.BytesIO()
348-
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
349-
# 设置压缩级别为最高(9)
349+
350+
existing_file = QuerySet(File).filter(sha256_hash=sha256_hash).first()
351+
if existing_file:
352+
self.loid = existing_file.loid
353+
return super().save()
354+
355+
compressed_data = self._compress_data(bytea)
356+
357+
self.loid = self._create_large_object()
358+
359+
self._write_compressed_data(compressed_data)
360+
361+
# 调用父类保存
362+
return super().save()
363+
364+
def _compress_data(self, data, compression_level=9):
365+
"""压缩数据到内存"""
366+
buffer = io.BytesIO()
367+
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
350368
zipinfo = zipfile.ZipInfo(self.file_name)
351369
zipinfo.compress_type = zipfile.ZIP_DEFLATED
352-
zip_file.writestr(zipinfo, bytea, compresslevel=9)
353-
# 获取压缩后的数据
354-
compressed_data = zip_buffer.getvalue()
355-
f = QuerySet(File).filter(sha256_hash=sha256_hash).first()
356-
if f is not None:
357-
self.loid = f.loid
358-
else:
359-
result = select_one("SELECT lo_from_bytea(%s, %s::bytea) as loid", [0, bytea])
360-
self.loid = result['loid']
361-
self.file_size = len(compressed_data)
362-
self.sha256_hash = sha256_hash
363-
# 可以在元数据中记录原始大小
364-
if 'original_size' not in self.meta:
365-
self.meta['original_size'] = len(bytea)
366-
super().save()
370+
zip_file.writestr(zipinfo, data, compresslevel=compression_level)
371+
372+
return buffer.getvalue()
373+
374+
def _create_large_object(self):
375+
result = select_one("SELECT lo_creat(-1)::int8 as lo_id;", [])
376+
return result['lo_id']
377+
378+
def _write_compressed_data(self, data, block_size=64 * 1024):
379+
buffer = io.BytesIO(data)
380+
offset = 0
381+
382+
while True:
383+
chunk = buffer.read(block_size)
384+
if not chunk:
385+
break
386+
387+
offset += len(chunk)
388+
select_one(
389+
"SELECT lo_put(%s::oid, %s::bigint, %s::bytea)::VARCHAR;",
390+
[self.loid, offset - len(chunk), chunk]
391+
)
367392

368393
def get_bytes(self):
369-
result = select_one(f'SELECT lo_get({self.loid}) as "data"', [])
370-
compressed_data = result['data']
394+
buffer = io.BytesIO()
395+
for chunk in self.get_bytes_stream():
396+
buffer.write(chunk)
371397
try:
372398
# 解压数据
373-
with zipfile.ZipFile(io.BytesIO(compressed_data)) as zip_file:
399+
with zipfile.ZipFile(buffer) as zip_file:
374400
return zip_file.read(self.file_name)
375401
except Exception as e:
376402
# 如果数据不是zip格式,直接返回原始数据
377-
return compressed_data
403+
return buffer.getvalue()
404+
405+
def get_bytes_stream(self, start=0, end=None, chunk_size=64 * 1024):
406+
def _read_with_offset():
407+
offset = start
408+
while True:
409+
result = select_one(
410+
"SELECT lo_get(%s::oid, %s, %s) as chunk",
411+
[self.loid, offset, end - offset if end and (end - offset) < chunk_size else chunk_size]
412+
)
413+
chunk = result['chunk'] if result else None
414+
if not chunk:
415+
break
416+
yield chunk
417+
offset += len(chunk)
418+
if len(chunk) < chunk_size:
419+
break
420+
if end and offset > end:
421+
break
422+
423+
return _read_with_offset()
378424

379425

380426
@receiver(pre_delete, sender=File)

0 commit comments

Comments
 (0)