Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
2 changes: 2 additions & 0 deletions java/rabbitmq-ot/.gradle/buildOutputCleanup/cache.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#Sun Apr 05 12:42:49 CEST 2020
gradle.version=5.6.4
Binary file not shown.
Empty file.
94 changes: 94 additions & 0 deletions java/rabbitmq-ot/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
## Tracing RabbitMQ Producer / Consumer code example


A detailed step-by-step tutorial on how to implement tracing with a RabbitMQ Consumer and Producer using RabbitMQ Java client and Datadog the OpenTracing API.

### _Preliminary tasks and first time steps_

**Install RabbitMQ on mac OSX (tested on High Sierra)**

````
COMP10619:RabbitMQ pejman.tabassomi$ brew install rabbitmq
````

**Start RabbitMQ**

````
COMP10619:RabbitMQ pejman.tabassomi$ brew services start rabbitmq
````

Another option is to use the official docker image and run it locally.
Below an examlple of docker-compose file.


##### _docker-compose.yml_
````
rabbitmq:
image: rabbitmq:management
ports:
- "5672:5672"
- "15672:15672"
````

**Spin up the container**

````
COMP10619:RabbitMQ pejman.tabassomi$ docker-compose up -d
````

### _Administration tasks_

**Start RabbitMQ**

````
COMP10619:RabbitMQ pejman.tabassomi$ brew services start rabbitmq
````

**Stop RabbitMQ**

````
COMP10619:RabbitMQ pejman.tabassomi$ brew services stop rabbitmq
````

**RabbitMQ UI**

````
http://localhost:15672
````

### _Spin up the Datadog Agent (Provide your API key to the belown command_


````
docker run -d --name datadog_agent -v /var/run/docker.sock:/var/run/docker.sock:ro -v /proc/:/host/proc/:ro -v /sys/fs/cgroup/:/host/sys/fs/cgroup:ro -p 127.0.0.1:8126:8126/tcp -e DD_API_KEY=<Your API key> -e DD_APM_ENABLED=true -e DD_APM_IGNORE_RESOURCES="GET /api/random" datadog/agent:latest
````

### _Clone the repository and build the application_

The project is structured as a main gradle project with two submodules (one for the consumer, the other for the producer)

````
COMP10619:RabbitMQ pejman.tabassomi$ git clone https://github.com/ptabasso2/prodconsrbmqdt.git
COMP10619:RabbitMQ pejman.tabassomi$ cd prodconsrbmqdt
COMP10619:prodconsrbmqdt pejman.tabassomi$ ./gradlew shadowjar
````


### _Start the app_

Open two terminal windows, one for the producer. The other for the consumer.

#### Consumer

````
COMP10619:RabbitMQ pejman.tabassomi$ java -jar consumer/build/libs/consumer-0.1.0.jar
````
and then

#### Producer

````
COMP10619:RabbitMQ pejman.tabassomi$ java -jar producer/build/libs/producer-0.1.0.jar
````


7 changes: 7 additions & 0 deletions java/rabbitmq-ot/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*
* This file was generated by the Gradle 'init' task.
*
* This is a general purpose Gradle build.
* Learn how to create Gradle builds at https://guides.gradle.org/creating-new-gradle-builds
*/
tasks.getByPath(':consumer:run').shouldRunAfter(':producer:run')
40 changes: 40 additions & 0 deletions java/rabbitmq-ot/consumer/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
plugins {
id 'java'
id 'application'
id "com.github.johnrengelman.shadow" version "5.2.0"
}

version '0.1.0'

shadowJar {
archiveFileName = "consumer-${version}.jar"
}

sourceCompatibility = 1.8

repositories {
mavenCentral()
}

dependencies {
compile 'com.rabbitmq:amqp-client:5.8.0'
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.10'
compile group: 'com.datadoghq', name: 'dd-trace-ot', version: '0.47.0'
//compile group: 'io.opentracing.contrib', name: 'opentracing-rabbitmq-client', version: '0.1.9'
compile 'com.squareup.okhttp3:okhttp:4.5.0'

}

ext {
javaMainClass = "com.datadog.pej.Main"
}

application {
mainClassName = javaMainClass
}

jar {
manifest {
attributes 'Main-Class': 'com.datadog.pej.Main'
}
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
3 changes: 3 additions & 0 deletions java/rabbitmq-ot/consumer/build/tmp/shadowJar/MANIFEST.MF
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Manifest-Version: 1.0
Main-Class: com.datadog.pej.Main

Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.datadog.pej;


import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* Represents a connection with a queue
* @author syntx
*
*/
public abstract class EndPoint{

protected Channel channel;
protected Connection connection;
protected String endPointName;

public EndPoint(String endpointName) throws IOException, TimeoutException {
this.endPointName = endpointName;

//Create a connection factory
ConnectionFactory factory = new ConnectionFactory();

//hostname of your rabbitmq server
factory.setHost("localhost");

//getting a connection
connection = factory.newConnection();

//creating a channel
channel = connection.createChannel();

//declaring a queue for this channel. If queue does not exist,
//it will be created on the server.
channel.queueDeclare(endpointName, false, false, false, null);
}


/**
* Close channel and connection. Not necessary as it happens implicitly any way.
* @throws IOException
*/
public void close() throws IOException, TimeoutException {
this.channel.close();
this.connection.close();
}
}
38 changes: 38 additions & 0 deletions java/rabbitmq-ot/consumer/src/main/java/com/datadog/pej/Main.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.datadog.pej;

import datadog.opentracing.DDTracer;
import datadog.trace.api.DDTags;
import datadog.trace.api.GlobalTracer;
import io.opentracing.Scope;
import io.opentracing.ScopeManager;
import io.opentracing.Span;
import io.opentracing.Tracer;


public class Main {

public static void main(String[] args) throws Exception{


//Tracer tracer = new DDTracer("Consumer");
Tracer tracer = DDTracer.builder().build();
GlobalTracer.registerIfAbsent((datadog.trace.api.Tracer) tracer);

ScopeManager sm = tracer.scopeManager();
Tracer.SpanBuilder tb = tracer.buildSpan("receiving");

Span span = tb.start();

try(Scope scope = sm.activate(span)) {
span.setTag(DDTags.SERVICE_NAME, "Consumer");
span.setTag(DDTags.RESOURCE_NAME, "receive message");
span.setTag(DDTags.SPAN_TYPE, "web");

QueueConsumer consumer = new QueueConsumer("spring-boot", tracer);
Thread consumerThread = new Thread(consumer);
consumerThread.start();
span.finish();
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package com.datadog.pej;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import datadog.trace.api.DDTags;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.contrib.rabbitmq.TracingChannel;
import okhttp3.Call;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.commons.lang3.SerializationUtils;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;


/**
* The endpoint that consumes messages off of the queue. Happens to be runnable.
* @author syntx
*
*/
public class QueueConsumer extends EndPoint implements Runnable, Consumer{

Tracer tracer;

public QueueConsumer(String endPointName, Tracer tracer) throws IOException, TimeoutException {
super(endPointName);
this.tracer = tracer;
}

public void run() {
try {
//start consuming messages. Auto acknowledge messages.
TracingChannel tracingChannel = new TracingChannel(channel, tracer);
//channel.basicConsume(endPointName, true,this);
tracingChannel.basicConsume(endPointName, true,this);

} catch (IOException e) {
e.printStackTrace();
}
}

/**
* Called when consumer is registered.
*/
public void handleConsumeOk(String consumerTag) {
System.out.println("Consumer "+consumerTag +" registered");
}

/**
* Called when new message is available.
*/
/*public void handleDelivery(String consumerTag, Envelope env,
BasicProperties props, byte[] body) throws IOException {
Map map = (HashMap)SerializationUtils.deserialize(body);
System.out.println("Message Number "+ map.get("message number") + " received.");

}*/

public void handleDelivery(String consumerTag, Envelope env,
BasicProperties props, byte[] body) throws IOException {
Map map = (HashMap)SerializationUtils.deserialize(body);
System.out.println("Message Number "+ map.get("message number") + " received.");

Span resultingspan = tracer.activeSpan();


Tracer.SpanBuilder httpspan = tracer.buildSpan("okhttp google").asChildOf(resultingspan);
Span childspan = httpspan.start();

try(Scope scope = tracer.activateSpan(childspan)){
childspan.setTag(DDTags.RESOURCE_NAME, "GET /");
childspan.setTag(DDTags.SPAN_TYPE, "web");
childspan.setTag(DDTags.SERVICE_NAME, "Google");
httpRequest();
System.out.println("Hello google");
childspan.finish();
}


}


public void httpRequest() throws IOException {
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("https://www.google.fr")
.build();

Call call = client.newCall(request);
Response response = call.execute();
System.out.println("http response code: " + response.code());

}


public void handleCancel(String consumerTag) {}
public void handleCancelOk(String consumerTag) {}
public void handleRecoverOk(String consumerTag) {}
public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}
}
Loading