@@ -322,4 +322,81 @@ public void testClose() throws Exception {
322322
323323 props .remove (Config .FLUENT_SENDER_CLASS );
324324 }
325+
326+ @ Test
327+ public void testInMultiThreading () throws Exception {
328+ final int N = 15 ;
329+ final int LOOP = 15000 ;
330+ final String tag = "foodb.bartbl" ;
331+ final ArrayList <Long > counters = new ArrayList <Long >(N );
332+ for (int i = 0 ; i < N ; i ++)
333+ counters .add (0L );
334+
335+ // start mock fluentd
336+ final int port = MockFluentd .randomPort ();
337+ final String host = "localhost" ;
338+ MockFluentd fluentd = new MockFluentd (port , new MockFluentd .MockProcess () {
339+ public void process (MessagePack msgpack , Socket socket ) throws IOException {
340+ BufferedInputStream in = new BufferedInputStream (socket .getInputStream ());
341+ try {
342+ while (true ) {
343+ Unpacker unpacker = msgpack .createUnpacker (in );
344+ Event e = unpacker .read (Event .class );
345+ if (e .tag .equals (tag )) {
346+ for (Map .Entry <String , Object > entry : e .data .entrySet ()) {
347+ Integer index = Integer .valueOf (entry .getKey ());
348+ Long i = counters .get (index );
349+ counters .set (index , i + (Long )entry .getValue ());
350+ }
351+ }
352+ }
353+ } catch (EOFException e ) {
354+ }
355+ }
356+ });
357+
358+ FixedThreadManager threadManager = new FixedThreadManager (1 );
359+ threadManager .submit (fluentd );
360+ Thread .sleep (1000 );
361+
362+ final FluentLogger logger = FluentLogger .getLogger (null , host , port );
363+ ExecutorService executorService = Executors .newFixedThreadPool (N );
364+ for (int i = 0 ; i < N ; i ++) {
365+ final int ii = i ;
366+ executorService .execute (new Runnable () {
367+ @ Override
368+ public void run () {
369+ Map <String , Object > event = new HashMap <String , Object >();
370+ for (int j = 0 ; j <= ii ; j ++) {
371+ event .put (String .valueOf (j ), j );
372+ }
373+ for (int j = 0 ; j < LOOP ; j ++) {
374+ logger .log (tag , event );
375+
376+ if (j % 500 == ii )
377+ logger .flush ();
378+ }
379+ logger .flush ();
380+ }
381+ });
382+ }
383+ Thread .sleep (1000 );
384+ executorService .shutdown ();
385+ executorService .awaitTermination (300 , TimeUnit .SECONDS );
386+
387+ logger .close ();
388+
389+ Thread .sleep (2000 );
390+
391+ // close mock fluentd
392+ fluentd .close ();
393+
394+ // wait for unpacking event data on fluentd
395+ threadManager .join ();
396+
397+ // check data
398+ for (int i = 0 ; i < N ; i ++) {
399+ assertEquals ((i * LOOP * (N - i )), (long )counters .get (i ));
400+ }
401+ }
325402}
0 commit comments