diff --git a/pom.xml b/pom.xml index 970ad6d4..9db120cd 100644 --- a/pom.xml +++ b/pom.xml @@ -110,6 +110,21 @@ titan-hbase ${titan.version} + + com.thinkaurelius.titan + titan-accumulo-core + ${titan.version} + + + org.apache.accumulo + accumulo-core + 1.4.3 + + + org.apache.thrift + libthrift + 0.6.1 + com.thinkaurelius.titan titan-es diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java new file mode 100644 index 00000000..fa760e86 --- /dev/null +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java @@ -0,0 +1,75 @@ +package com.thinkaurelius.faunus.formats.titan.accumulo; + +import com.google.common.base.Preconditions; +import com.thinkaurelius.faunus.FaunusVertex; +import com.thinkaurelius.faunus.formats.titan.FaunusTitanGraph; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.Entry; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StaticBufferEntry; +import com.thinkaurelius.titan.diskstorage.util.StaticByteBuffer; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.Map; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; + +/** + * @author Etienne Deprit + */ +public class FaunusTitanAccumuloGraph extends FaunusTitanGraph { + + public FaunusTitanAccumuloGraph(final String configFile) throws ConfigurationException { + this(new PropertiesConfiguration(configFile)); + } + + public FaunusTitanAccumuloGraph(final Configuration configuration) { + super(configuration); + } + + public FaunusVertex readFaunusVertex(byte[] key, final Iterator> columnIterator) { + return super.readFaunusVertex(ByteBuffer.wrap(key), new AccumuloMapIterable(columnIterator)); + } + + private static class AccumuloMapIterable implements Iterable { + + private final Iterator> columnIterator; + + public AccumuloMapIterable(final Iterator> columnIterator) { + Preconditions.checkNotNull(columnIterator); + this.columnIterator = columnIterator; + } + + @Override + public Iterator iterator() { + return new AccumuloMapIterator(columnIterator); + } + } + + private static class AccumuloMapIterator implements Iterator { + + private final Iterator> iterator; + + public AccumuloMapIterator(final Iterator> iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public Entry next() { + final Map.Entry entry = iterator.next(); + return new StaticBufferEntry(new StaticByteBuffer(entry.getKey().getColumnQualifier().getBytes()), + new StaticByteBuffer(entry.getValue().get())); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java new file mode 100644 index 00000000..c9dd1261 --- /dev/null +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java @@ -0,0 +1,119 @@ +package com.thinkaurelius.faunus.formats.titan.accumulo; + +import com.thinkaurelius.faunus.FaunusVertex; +import com.thinkaurelius.faunus.formats.VertexQueryFilter; +import com.thinkaurelius.faunus.formats.titan.GraphFactory; +import com.thinkaurelius.faunus.formats.titan.TitanInputFormat; +import static com.thinkaurelius.faunus.formats.titan.TitanInputFormat.FAUNUS_GRAPH_INPUT_TITAN_STORAGE_HOSTNAME; +import static com.thinkaurelius.faunus.formats.titan.TitanInputFormat.FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT; +import com.thinkaurelius.faunus.mapreduce.FaunusCompiler; +import com.thinkaurelius.titan.diskstorage.Backend; +import com.thinkaurelius.titan.diskstorage.accumulo.AccumuloKeyColumnValueStore; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.SliceQuery; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.Pair; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.io.Text; + +/** + * @author Etienne Deprit + */ +public class TitanAccumuloInputFormat extends TitanInputFormat { + + // Accumulo store manager configuration + public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_TABLENAME = + "faunus.graph.input.titan.storage.tablename"; + public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_INSTANCE = + "faunus.graph.input.titan.storage.accumulo-config.instance"; + public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_USERNAME = + "faunus.graph.input.titan.storage.accumulo-config.username"; + public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_PASSWORD = + "faunus.graph.input.titan.storage.accumulo-config.password"; + public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_SERVER_ITERATORS = + "faunus.graph.input.titan.storage.accumulo-config.server-side-iterators"; + // Instance variables + private final AccumuloRowInputFormat accumuloRowInputFormat = new AccumuloRowInputFormat(); + private Configuration configuration; + private FaunusTitanAccumuloGraph graph; + private VertexQueryFilter vertexQuery; + private boolean pathEnabled; + + @Override + public List getSplits(final JobContext jobContext) throws IOException, InterruptedException { + return accumuloRowInputFormat.getSplits(jobContext); + } + + @Override + public RecordReader createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + return new TitanAccumuloRecordReader(graph, vertexQuery, pathEnabled, accumuloRowInputFormat.createRecordReader(inputSplit, taskAttemptContext)); + } + + @Override + public void setConf(final Configuration config) { + configuration = config; + graph = new FaunusTitanAccumuloGraph(GraphFactory.generateTitanConfiguration(config, FAUNUS_GRAPH_INPUT_TITAN)); + vertexQuery = VertexQueryFilter.create(config); + pathEnabled = config.getBoolean(FaunusCompiler.PATH_ENABLED, false); + + String instance = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_INSTANCE); + String zookeepers = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_HOSTNAME); + + if (config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT, null) != null) { + zookeepers = StringUtils.join(zookeepers.split(","), + ":" + config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT) + ","); + } + + try { + AccumuloRowInputFormat.setZooKeeperInstance(config, instance, zookeepers); + } catch (IllegalStateException ex) { + // zookeeper instance reset in map task + } + + String username = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_USERNAME); + String password = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_PASSWORD); + + String tableName = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_TABLENAME); + boolean serverSideIterators = config.getBoolean(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_SERVER_ITERATORS, false); + + try { + AccumuloRowInputFormat.setInputInfo(config, username, password.getBytes(), tableName, new Authorizations()); + } catch (IllegalStateException ex) { + // input info reset in map task + } + + config.set("storage.read-only", "true"); + config.set("autotype", "none"); + + Collection> columnFamilyColumnQualifierPairs = + Collections.singletonList(new Pair(new Text(Backend.EDGESTORE_NAME), null)); + + AccumuloRowInputFormat.fetchColumns(config, columnFamilyColumnQualifierPairs); + + SliceQuery sliceQuery = TitanInputFormat.inputSlice(vertexQuery, graph); + IteratorSetting is = AccumuloKeyColumnValueStore.getColumnSliceIterator(sliceQuery); + + if (is != null) { + AccumuloRowInputFormat.addIterator(config, is); + if (!serverSideIterators) { + AccumuloRowInputFormat.setLocalIterators(config, true); + } + } + } + + @Override + public Configuration getConf() { + return configuration; + } +} \ No newline at end of file diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormat.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormat.java new file mode 100644 index 00000000..ac824395 --- /dev/null +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormat.java @@ -0,0 +1,10 @@ +package com.thinkaurelius.faunus.formats.titan.accumulo; + +import com.thinkaurelius.faunus.formats.titan.TitanOutputFormat; + +/** + * @author Etienne Deprit + */ +public class TitanAccumuloOutputFormat extends TitanOutputFormat { + +} \ No newline at end of file diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java new file mode 100644 index 00000000..2356193a --- /dev/null +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java @@ -0,0 +1,78 @@ +package com.thinkaurelius.faunus.formats.titan.accumulo; + +import com.thinkaurelius.faunus.FaunusVertex; +import com.thinkaurelius.faunus.formats.VertexQueryFilter; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.Map; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.PeekingIterator; +import org.apache.hadoop.io.Text; + +/** + * @author Etienne Deprit + */ +public class TitanAccumuloRecordReader extends RecordReader { + + private RecordReader>> reader; + private FaunusTitanAccumuloGraph graph; + private VertexQueryFilter vertexQuery; + private boolean pathEnabled; + private FaunusVertex vertex; + + public TitanAccumuloRecordReader(final FaunusTitanAccumuloGraph graph, + final VertexQueryFilter vertexQuery, final boolean pathEnabled, + final RecordReader>> reader) { + this.graph = graph; + this.vertexQuery = vertexQuery; + this.pathEnabled = pathEnabled; + this.reader = reader; + } + + @Override + public void initialize(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + reader.initialize(inputSplit, taskAttemptContext); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + while (reader.nextKeyValue()) { + final FaunusVertex temp = graph.readFaunusVertex(reader.getCurrentKey().getBytes(), reader.getCurrentValue()); + if (null != temp) { + if (this.pathEnabled) { + temp.enablePath(true); + } + vertex = temp; + vertexQuery.defaultFilter(this.vertex); + return true; + } + } + return false; + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return NullWritable.get(); + } + + @Override + public FaunusVertex getCurrentValue() throws IOException, InterruptedException { + return vertex; + } + + @Override + public void close() throws IOException { + graph.shutdown(); + reader.close(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return reader.getProgress(); + } +} diff --git a/src/main/java/com/thinkaurelius/faunus/tinkerpop/gremlin/Imports.java b/src/main/java/com/thinkaurelius/faunus/tinkerpop/gremlin/Imports.java index f78e593b..18fa67e5 100644 --- a/src/main/java/com/thinkaurelius/faunus/tinkerpop/gremlin/Imports.java +++ b/src/main/java/com/thinkaurelius/faunus/tinkerpop/gremlin/Imports.java @@ -46,6 +46,7 @@ public class Imports { imports.add("com.thinkaurelius.faunus.formats.titan.*"); imports.add("com.thinkaurelius.faunus.formats.titan.hbase.*"); imports.add("com.thinkaurelius.faunus.formats.titan.cassandra.*"); + imports.add("com.thinkaurelius.faunus.formats.titan.accumulo.*"); imports.add("com.thinkaurelius.faunus.hdfs.*"); imports.add("com.thinkaurelius.faunus.tinkerpop.gremlin.*"); imports.add("com.tinkerpop.gremlin.Tokens.T"); diff --git a/src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormatTest.java b/src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormatTest.java new file mode 100644 index 00000000..e32c7d0a --- /dev/null +++ b/src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormatTest.java @@ -0,0 +1,14 @@ +package com.thinkaurelius.faunus.formats.titan.accumulo; + +import com.thinkaurelius.faunus.tinkerpop.gremlin.Imports; +import junit.framework.TestCase; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class TitanAccumuloInputFormatTest extends TestCase { + + public void testInGremlinImports() { + assertTrue(Imports.getImports().contains(TitanAccumuloInputFormat.class.getPackage().getName() + ".*")); + } +} diff --git a/src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormatTest.java b/src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormatTest.java new file mode 100644 index 00000000..33f5074c --- /dev/null +++ b/src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormatTest.java @@ -0,0 +1,15 @@ +package com.thinkaurelius.faunus.formats.titan.accumulo; + +import com.thinkaurelius.faunus.formats.titan.hbase.*; +import com.thinkaurelius.faunus.tinkerpop.gremlin.Imports; +import junit.framework.TestCase; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class TitanAccumuloOutputFormatTest extends TestCase { + + public void testInGremlinImports() { + assertTrue(Imports.getImports().contains(TitanAccumuloOutputFormat.class.getPackage().getName() + ".*")); + } +}