From 815e7060ae3c2c4e6af6923f6ff1f086c9bbc990 Mon Sep 17 00:00:00 2001 From: Tomohiro Hosaka Date: Fri, 1 May 2020 15:23:31 +0900 Subject: [PATCH] to be able to flush while streaming --- lib/Plack/Middleware/Deflater.pm | 9 +++- t/streaming_flush.t | 75 ++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 2 deletions(-) create mode 100644 t/streaming_flush.t diff --git a/lib/Plack/Middleware/Deflater.pm b/lib/Plack/Middleware/Deflater.pm index 4cb0b03..42f555c 100644 --- a/lib/Plack/Middleware/Deflater.pm +++ b/lib/Plack/Middleware/Deflater.pm @@ -88,7 +88,7 @@ sub call { if ( $res->[2] && ref($res->[2]) && ref($res->[2]) eq 'ARRAY' ) { my $buf = ''; foreach (@{$res->[2]}) { - $buf .= $encoder->print($_) if defined $_; + $buf .= $encoder->print($_, $env) if defined $_; } $buf .= $encoder->close(); $res->[2] = [$buf]; @@ -97,7 +97,7 @@ sub call { # delayed or stream return sub { - $encoder->print(shift); + $encoder->print(shift, $env); }; } }); @@ -135,6 +135,7 @@ sub print : method { my $self = shift; return if $self->{closed}; my $chunk = shift; + my $env = shift; if ( ! defined $chunk ) { my ($buf,$status) = $self->{encoder}->flush(); die "deflate failed: $status" if ( $status != Z_OK ); @@ -148,6 +149,10 @@ sub print : method { my ($buf,$status) = $self->{encoder}->deflate($chunk); die "deflate failed: $status" if ( $status != Z_OK ); + if ( !length $buf and my $flush_type = $env->{'psgix.deflater_flush_type'} ) { + ($buf,$status) = $self->{encoder}->flush($flush_type); + die "deflate failed: $status" if ( $status != Z_OK ); + } $self->{length} += length $chunk; $self->{crc} = crc32($chunk,$self->{crc}); if ( length $buf ) { diff --git a/t/streaming_flush.t b/t/streaming_flush.t new file mode 100644 index 0000000..357b160 --- /dev/null +++ b/t/streaming_flush.t @@ -0,0 +1,75 @@ +use strict; +use warnings; +use FindBin; +use Test::More tests => 4; +use HTTP::Request::Common; +use Plack::Test; +use Plack::Builder; +use Test::Requires { + 'AnyEvent' => 5.34, + 'Plack::Test::AnyEvent' => 0.03 +}; + +my $app = builder { + enable 'Chunked'; + enable 'Deflater'; + + # Non streaming + # sub { [200, [ 'Content-Type' => 'text/plain' ], [ "Hello World" ]] } + + # streaming + sub { + my $env = shift; + return sub { + my $r = shift; + my $w = $r->([ '200', [ 'Content-Type' => 'text/plain' ]]); + my $timer; + my $i = 0; + my @message = qw/Hello World/; + $timer = AnyEvent->timer( + after => 1, + interval => 1, + cb => sub { + use Compress::Zlib (); + local $env->{'psgix.deflater_flush_type'} = Compress::Zlib::Z_SYNC_FLUSH(); + $w->write($message[$i]. "x" x 1024 . "\n"); + $i++; + if ( $i == 2 ) { + $w->close; + undef $timer; + } + } + ); + }; + }; +}; + +local $Plack::Test::Impl = 'AnyEvent'; + +test_psgi + app => $app, + client => sub { + my $cb = shift; + + my $req = HTTP::Request->new( GET => "http://localhost/" ); + $req->accept_decodable; + my $res = $cb->($req); + + # The first is by Plack::Middleware::Chunked. + # The second is by Plack::Test::AnyEvent. (Is it reasonable?) + # is $res->header('Transfer-Encoding'), 'chunked'; # chunked, chunked + like $res->header('Transfer-Encoding'), qr/chunked/; + + my @chunk; + $res->on_content_received(sub { + my ($content) = @_; + push @chunk, [ $content, time ]; + }); + $res->recv; + is $res->content_encoding, 'gzip'; + is @chunk, 2; + ok abs $chunk[0][1] - $chunk[1][1] >= 1; + }; + + +done_testing;