diff --git a/pom.xml b/pom.xml
index 970ad6d4..9db120cd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -110,6 +110,21 @@
titan-hbase
${titan.version}
+
+ com.thinkaurelius.titan
+ titan-accumulo-core
+ ${titan.version}
+
+
+ org.apache.accumulo
+ accumulo-core
+ 1.4.3
+
+
+ org.apache.thrift
+ libthrift
+ 0.6.1
+
com.thinkaurelius.titan
titan-es
diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java
new file mode 100644
index 00000000..fa760e86
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java
@@ -0,0 +1,75 @@
+package com.thinkaurelius.faunus.formats.titan.accumulo;
+
+import com.google.common.base.Preconditions;
+import com.thinkaurelius.faunus.FaunusVertex;
+import com.thinkaurelius.faunus.formats.titan.FaunusTitanGraph;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.Entry;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StaticBufferEntry;
+import com.thinkaurelius.titan.diskstorage.util.StaticByteBuffer;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+
+/**
+ * @author Etienne Deprit
+ */
+public class FaunusTitanAccumuloGraph extends FaunusTitanGraph {
+
+ public FaunusTitanAccumuloGraph(final String configFile) throws ConfigurationException {
+ this(new PropertiesConfiguration(configFile));
+ }
+
+ public FaunusTitanAccumuloGraph(final Configuration configuration) {
+ super(configuration);
+ }
+
+ public FaunusVertex readFaunusVertex(byte[] key, final Iterator> columnIterator) {
+ return super.readFaunusVertex(ByteBuffer.wrap(key), new AccumuloMapIterable(columnIterator));
+ }
+
+ private static class AccumuloMapIterable implements Iterable {
+
+ private final Iterator> columnIterator;
+
+ public AccumuloMapIterable(final Iterator> columnIterator) {
+ Preconditions.checkNotNull(columnIterator);
+ this.columnIterator = columnIterator;
+ }
+
+ @Override
+ public Iterator iterator() {
+ return new AccumuloMapIterator(columnIterator);
+ }
+ }
+
+ private static class AccumuloMapIterator implements Iterator {
+
+ private final Iterator> iterator;
+
+ public AccumuloMapIterator(final Iterator> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public Entry next() {
+ final Map.Entry entry = iterator.next();
+ return new StaticBufferEntry(new StaticByteBuffer(entry.getKey().getColumnQualifier().getBytes()),
+ new StaticByteBuffer(entry.getValue().get()));
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java
new file mode 100644
index 00000000..c9dd1261
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java
@@ -0,0 +1,119 @@
+package com.thinkaurelius.faunus.formats.titan.accumulo;
+
+import com.thinkaurelius.faunus.FaunusVertex;
+import com.thinkaurelius.faunus.formats.VertexQueryFilter;
+import com.thinkaurelius.faunus.formats.titan.GraphFactory;
+import com.thinkaurelius.faunus.formats.titan.TitanInputFormat;
+import static com.thinkaurelius.faunus.formats.titan.TitanInputFormat.FAUNUS_GRAPH_INPUT_TITAN_STORAGE_HOSTNAME;
+import static com.thinkaurelius.faunus.formats.titan.TitanInputFormat.FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT;
+import com.thinkaurelius.faunus.mapreduce.FaunusCompiler;
+import com.thinkaurelius.titan.diskstorage.Backend;
+import com.thinkaurelius.titan.diskstorage.accumulo.AccumuloKeyColumnValueStore;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.SliceQuery;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.Text;
+
+/**
+ * @author Etienne Deprit
+ */
+public class TitanAccumuloInputFormat extends TitanInputFormat {
+
+ // Accumulo store manager configuration
+ public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_TABLENAME =
+ "faunus.graph.input.titan.storage.tablename";
+ public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_INSTANCE =
+ "faunus.graph.input.titan.storage.accumulo-config.instance";
+ public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_USERNAME =
+ "faunus.graph.input.titan.storage.accumulo-config.username";
+ public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_PASSWORD =
+ "faunus.graph.input.titan.storage.accumulo-config.password";
+ public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_SERVER_ITERATORS =
+ "faunus.graph.input.titan.storage.accumulo-config.server-side-iterators";
+ // Instance variables
+ private final AccumuloRowInputFormat accumuloRowInputFormat = new AccumuloRowInputFormat();
+ private Configuration configuration;
+ private FaunusTitanAccumuloGraph graph;
+ private VertexQueryFilter vertexQuery;
+ private boolean pathEnabled;
+
+ @Override
+ public List getSplits(final JobContext jobContext) throws IOException, InterruptedException {
+ return accumuloRowInputFormat.getSplits(jobContext);
+ }
+
+ @Override
+ public RecordReader createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ return new TitanAccumuloRecordReader(graph, vertexQuery, pathEnabled, accumuloRowInputFormat.createRecordReader(inputSplit, taskAttemptContext));
+ }
+
+ @Override
+ public void setConf(final Configuration config) {
+ configuration = config;
+ graph = new FaunusTitanAccumuloGraph(GraphFactory.generateTitanConfiguration(config, FAUNUS_GRAPH_INPUT_TITAN));
+ vertexQuery = VertexQueryFilter.create(config);
+ pathEnabled = config.getBoolean(FaunusCompiler.PATH_ENABLED, false);
+
+ String instance = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_INSTANCE);
+ String zookeepers = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_HOSTNAME);
+
+ if (config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT, null) != null) {
+ zookeepers = StringUtils.join(zookeepers.split(","),
+ ":" + config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT) + ",");
+ }
+
+ try {
+ AccumuloRowInputFormat.setZooKeeperInstance(config, instance, zookeepers);
+ } catch (IllegalStateException ex) {
+ // zookeeper instance reset in map task
+ }
+
+ String username = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_USERNAME);
+ String password = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_PASSWORD);
+
+ String tableName = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_TABLENAME);
+ boolean serverSideIterators = config.getBoolean(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_SERVER_ITERATORS, false);
+
+ try {
+ AccumuloRowInputFormat.setInputInfo(config, username, password.getBytes(), tableName, new Authorizations());
+ } catch (IllegalStateException ex) {
+ // input info reset in map task
+ }
+
+ config.set("storage.read-only", "true");
+ config.set("autotype", "none");
+
+ Collection> columnFamilyColumnQualifierPairs =
+ Collections.singletonList(new Pair(new Text(Backend.EDGESTORE_NAME), null));
+
+ AccumuloRowInputFormat.fetchColumns(config, columnFamilyColumnQualifierPairs);
+
+ SliceQuery sliceQuery = TitanInputFormat.inputSlice(vertexQuery, graph);
+ IteratorSetting is = AccumuloKeyColumnValueStore.getColumnSliceIterator(sliceQuery);
+
+ if (is != null) {
+ AccumuloRowInputFormat.addIterator(config, is);
+ if (!serverSideIterators) {
+ AccumuloRowInputFormat.setLocalIterators(config, true);
+ }
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return configuration;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormat.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormat.java
new file mode 100644
index 00000000..ac824395
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormat.java
@@ -0,0 +1,10 @@
+package com.thinkaurelius.faunus.formats.titan.accumulo;
+
+import com.thinkaurelius.faunus.formats.titan.TitanOutputFormat;
+
+/**
+ * @author Etienne Deprit
+ */
+public class TitanAccumuloOutputFormat extends TitanOutputFormat {
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java
new file mode 100644
index 00000000..2356193a
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java
@@ -0,0 +1,78 @@
+package com.thinkaurelius.faunus.formats.titan.accumulo;
+
+import com.thinkaurelius.faunus.FaunusVertex;
+import com.thinkaurelius.faunus.formats.VertexQueryFilter;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.hadoop.io.Text;
+
+/**
+ * @author Etienne Deprit
+ */
+public class TitanAccumuloRecordReader extends RecordReader {
+
+ private RecordReader>> reader;
+ private FaunusTitanAccumuloGraph graph;
+ private VertexQueryFilter vertexQuery;
+ private boolean pathEnabled;
+ private FaunusVertex vertex;
+
+ public TitanAccumuloRecordReader(final FaunusTitanAccumuloGraph graph,
+ final VertexQueryFilter vertexQuery, final boolean pathEnabled,
+ final RecordReader>> reader) {
+ this.graph = graph;
+ this.vertexQuery = vertexQuery;
+ this.pathEnabled = pathEnabled;
+ this.reader = reader;
+ }
+
+ @Override
+ public void initialize(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ reader.initialize(inputSplit, taskAttemptContext);
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ while (reader.nextKeyValue()) {
+ final FaunusVertex temp = graph.readFaunusVertex(reader.getCurrentKey().getBytes(), reader.getCurrentValue());
+ if (null != temp) {
+ if (this.pathEnabled) {
+ temp.enablePath(true);
+ }
+ vertex = temp;
+ vertexQuery.defaultFilter(this.vertex);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public FaunusVertex getCurrentValue() throws IOException, InterruptedException {
+ return vertex;
+ }
+
+ @Override
+ public void close() throws IOException {
+ graph.shutdown();
+ reader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return reader.getProgress();
+ }
+}
diff --git a/src/main/java/com/thinkaurelius/faunus/tinkerpop/gremlin/Imports.java b/src/main/java/com/thinkaurelius/faunus/tinkerpop/gremlin/Imports.java
index f78e593b..18fa67e5 100644
--- a/src/main/java/com/thinkaurelius/faunus/tinkerpop/gremlin/Imports.java
+++ b/src/main/java/com/thinkaurelius/faunus/tinkerpop/gremlin/Imports.java
@@ -46,6 +46,7 @@ public class Imports {
imports.add("com.thinkaurelius.faunus.formats.titan.*");
imports.add("com.thinkaurelius.faunus.formats.titan.hbase.*");
imports.add("com.thinkaurelius.faunus.formats.titan.cassandra.*");
+ imports.add("com.thinkaurelius.faunus.formats.titan.accumulo.*");
imports.add("com.thinkaurelius.faunus.hdfs.*");
imports.add("com.thinkaurelius.faunus.tinkerpop.gremlin.*");
imports.add("com.tinkerpop.gremlin.Tokens.T");
diff --git a/src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormatTest.java b/src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormatTest.java
new file mode 100644
index 00000000..e32c7d0a
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormatTest.java
@@ -0,0 +1,14 @@
+package com.thinkaurelius.faunus.formats.titan.accumulo;
+
+import com.thinkaurelius.faunus.tinkerpop.gremlin.Imports;
+import junit.framework.TestCase;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class TitanAccumuloInputFormatTest extends TestCase {
+
+ public void testInGremlinImports() {
+ assertTrue(Imports.getImports().contains(TitanAccumuloInputFormat.class.getPackage().getName() + ".*"));
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormatTest.java b/src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormatTest.java
new file mode 100644
index 00000000..33f5074c
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormatTest.java
@@ -0,0 +1,15 @@
+package com.thinkaurelius.faunus.formats.titan.accumulo;
+
+import com.thinkaurelius.faunus.formats.titan.hbase.*;
+import com.thinkaurelius.faunus.tinkerpop.gremlin.Imports;
+import junit.framework.TestCase;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class TitanAccumuloOutputFormatTest extends TestCase {
+
+ public void testInGremlinImports() {
+ assertTrue(Imports.getImports().contains(TitanAccumuloOutputFormat.class.getPackage().getName() + ".*"));
+ }
+}