From 8bec9e2a965e31692557481983063c4ff5493a76 Mon Sep 17 00:00:00 2001 From: Etienne Deprit Date: Fri, 14 Jun 2013 13:59:12 -0400 Subject: [PATCH 01/11] Skeleton Accumulo input/output formats. --- pom.xml | 20 +++++ .../accumulo/FaunusTitanAccumuloGraph.java | 79 ++++++++++++++++ .../accumulo/TitanAccumuloInputFormat.java | 89 +++++++++++++++++++ .../accumulo/TitanAccumuloOutputFormat.java | 10 +++ .../accumulo/TitanAccumuloRecordReader.java | 72 +++++++++++++++ .../faunus/tinkerpop/gremlin/Imports.java | 1 + .../TitanAccumuloInputFormatTest.java | 14 +++ .../TitanAccumuloOutputFormatTest.java | 15 ++++ 8 files changed, 300 insertions(+) create mode 100644 src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java create mode 100644 src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java create mode 100644 src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormat.java create mode 100644 src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java create mode 100644 src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormatTest.java create mode 100644 src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormatTest.java diff --git a/pom.xml b/pom.xml index e05f9dbe..eede30a7 100644 --- a/pom.xml +++ b/pom.xml @@ -91,6 +91,26 @@ titan-hbase ${titan.version} + + com.thinkaurelius.titan + titan-accumulo + ${titan.version} + + + org.apache.accumulo + accumulo-core + 1.4.3 + + + org.apache.thrift + libthrift + 0.6.1 + + + com.thinkaurelius.titan + titan-lucene + ${titan.version} + org.openrdf.sesame diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java new file mode 100644 index 00000000..f5c86085 --- /dev/null +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java @@ -0,0 +1,79 @@ +package com.thinkaurelius.faunus.formats.titan.accumulo; + +import com.thinkaurelius.faunus.formats.titan.hbase.*; +import com.google.common.base.Preconditions; +import com.thinkaurelius.faunus.FaunusVertex; +import com.thinkaurelius.faunus.formats.titan.FaunusTitanGraph; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.Entry; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StaticBufferEntry; +import com.thinkaurelius.titan.diskstorage.util.StaticByteBuffer; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; + +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; + +/** + * (c) Matthias Broecheler (me@matthiasb.com) + */ + +public class FaunusTitanAccumuloGraph extends FaunusTitanGraph { + + public FaunusTitanAccumuloGraph(final String configFile) throws ConfigurationException { + this(new PropertiesConfiguration(configFile)); + } + + public FaunusTitanAccumuloGraph(final Configuration configuration) { + super(configuration); + } + + public FaunusVertex readFaunusVertex(byte[] key, final NavigableMap> rowMap) { + return super.readFaunusVertex(ByteBuffer.wrap(key), new HBaseMapIterable(rowMap)); + } + + private static class HBaseMapIterable implements Iterable { + + private final NavigableMap> columnValues; + + public HBaseMapIterable(final NavigableMap> columnValues) { + Preconditions.checkNotNull(columnValues); + this.columnValues = columnValues; + } + + @Override + public Iterator iterator() { + return new HBaseMapIterator(columnValues.entrySet().iterator()); + } + + } + + private static class HBaseMapIterator implements Iterator { + + private final Iterator>> iterator; + + public HBaseMapIterator(final Iterator>> iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public Entry next() { + final Map.Entry> entry = iterator.next(); + return new StaticBufferEntry(new StaticByteBuffer(entry.getKey()), new StaticByteBuffer(entry.getValue().lastEntry().getValue())); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + +} + diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java new file mode 100644 index 00000000..c3dbb45a --- /dev/null +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java @@ -0,0 +1,89 @@ +package com.thinkaurelius.faunus.formats.titan.accumulo; + +import com.thinkaurelius.faunus.formats.titan.hbase.*; +import com.thinkaurelius.faunus.FaunusVertex; +import com.thinkaurelius.faunus.formats.VertexQueryFilter; +import com.thinkaurelius.faunus.formats.titan.GraphFactory; +import com.thinkaurelius.faunus.formats.titan.TitanInputFormat; +import com.thinkaurelius.faunus.mapreduce.FaunusCompiler; +import com.thinkaurelius.titan.diskstorage.Backend; +import com.thinkaurelius.titan.diskstorage.hbase.HBaseKeyColumnValueStore; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.mapreduce.TableRecordReader; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.List; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class TitanAccumuloInputFormat extends TitanInputFormat { + + public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_TABLENAME = "faunus.graph.input.titan.storage.tablename"; + static final byte[] EDGE_STORE_FAMILY = Bytes.toBytes(Backend.EDGESTORE_NAME); + + private final TableInputFormat tableInputFormat = new TableInputFormat(); + private FaunusTitanHBaseGraph graph; + private VertexQueryFilter vertexQuery; + private boolean pathEnabled; + + @Override + public List getSplits(final JobContext jobContext) throws IOException, InterruptedException { + return this.tableInputFormat.getSplits(jobContext); + } + + @Override + public RecordReader createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + return new TitanHBaseRecordReader(this.graph, this.vertexQuery, this.pathEnabled, (TableRecordReader) this.tableInputFormat.createRecordReader(inputSplit, taskAttemptContext)); + } + + @Override + public void setConf(final Configuration config) { + this.graph = new FaunusTitanHBaseGraph(GraphFactory.generateTitanConfiguration(config, FAUNUS_GRAPH_INPUT_TITAN)); + this.vertexQuery = VertexQueryFilter.create(config); + this.pathEnabled = config.getBoolean(FaunusCompiler.PATH_ENABLED, false); + + //config.set(TableInputFormat.SCAN_COLUMN_FAMILY, Backend.EDGESTORE_NAME); + config.set(TableInputFormat.INPUT_TABLE, config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_TABLENAME)); + config.set(HConstants.ZOOKEEPER_QUORUM, config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_HOSTNAME)); + if (config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT, null) != null) + config.set(HConstants.ZOOKEEPER_CLIENT_PORT, config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT)); + config.set("storage.read-only", "true"); + config.set("autotype", "none"); + Scan scanner = new Scan(); + scanner.addFamily(Backend.EDGESTORE_NAME.getBytes()); + scanner.setFilter(getColumnFilter(this.vertexQuery)); + //TODO (minor): should we set other options in http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Scan.html for optimization? + Method converter; + try { + converter = TableMapReduceUtil.class.getDeclaredMethod("convertScanToString", Scan.class); + converter.setAccessible(true); + config.set(TableInputFormat.SCAN, (String) converter.invoke(null, scanner)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + this.tableInputFormat.setConf(config); + } + + private Filter getColumnFilter(VertexQueryFilter inputFilter) { + return HBaseKeyColumnValueStore.getFilter(TitanInputFormat.inputSlice(inputFilter, graph)); + } + + @Override + public Configuration getConf() { + return tableInputFormat.getConf(); + } +} diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormat.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormat.java new file mode 100644 index 00000000..6f520f39 --- /dev/null +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormat.java @@ -0,0 +1,10 @@ +package com.thinkaurelius.faunus.formats.titan.accumulo; + +import com.thinkaurelius.faunus.formats.titan.TitanOutputFormat; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class TitanAccumuloOutputFormat extends TitanOutputFormat { + +} \ No newline at end of file diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java new file mode 100644 index 00000000..6c61bd73 --- /dev/null +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java @@ -0,0 +1,72 @@ +package com.thinkaurelius.faunus.formats.titan.accumulo; + +import com.thinkaurelius.faunus.formats.titan.hbase.*; +import com.thinkaurelius.faunus.FaunusVertex; +import com.thinkaurelius.faunus.formats.VertexQueryFilter; +import org.apache.hadoop.hbase.mapreduce.TableRecordReader; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class TitanAccumuloRecordReader extends RecordReader { + + private TableRecordReader reader; + private FaunusTitanHBaseGraph graph; + private VertexQueryFilter vertexQuery; + private boolean pathEnabled; + + private FaunusVertex vertex; + + public TitanAccumuloRecordReader(final FaunusTitanHBaseGraph graph, final VertexQueryFilter vertexQuery, final boolean pathEnabled, final TableRecordReader reader) { + this.graph = graph; + this.vertexQuery = vertexQuery; + this.pathEnabled = pathEnabled; + this.reader = reader; + } + + @Override + public void initialize(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + this.reader.initialize(inputSplit, taskAttemptContext); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + while (this.reader.nextKeyValue()) { + final FaunusVertex temp = this.graph.readFaunusVertex(this.reader.getCurrentKey().copyBytes(), this.reader.getCurrentValue().getMap().get(TitanAccumuloInputFormat.EDGE_STORE_FAMILY)); + if (null != temp) { + if (this.pathEnabled) temp.enablePath(true); + this.vertex = temp; + this.vertexQuery.defaultFilter(this.vertex); + return true; + } + } + return false; + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return NullWritable.get(); + } + + @Override + public FaunusVertex getCurrentValue() throws IOException, InterruptedException { + return this.vertex; + } + + @Override + public void close() throws IOException { + this.graph.shutdown(); + this.reader.close(); + } + + @Override + public float getProgress() { + return this.reader.getProgress(); + } +} diff --git a/src/main/java/com/thinkaurelius/faunus/tinkerpop/gremlin/Imports.java b/src/main/java/com/thinkaurelius/faunus/tinkerpop/gremlin/Imports.java index bf0e284d..4cc21ceb 100644 --- a/src/main/java/com/thinkaurelius/faunus/tinkerpop/gremlin/Imports.java +++ b/src/main/java/com/thinkaurelius/faunus/tinkerpop/gremlin/Imports.java @@ -38,6 +38,7 @@ public class Imports { imports.add("com.thinkaurelius.faunus.formats.titan.*"); imports.add("com.thinkaurelius.faunus.formats.titan.hbase.*"); imports.add("com.thinkaurelius.faunus.formats.titan.cassandra.*"); + imports.add("com.thinkaurelius.faunus.formats.titan.accumulo.*"); imports.add("com.thinkaurelius.faunus.hdfs.*"); imports.add("com.thinkaurelius.faunus.tinkerpop.gremlin.*"); imports.add("com.tinkerpop.gremlin.Tokens.T"); diff --git a/src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormatTest.java b/src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormatTest.java new file mode 100644 index 00000000..e32c7d0a --- /dev/null +++ b/src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormatTest.java @@ -0,0 +1,14 @@ +package com.thinkaurelius.faunus.formats.titan.accumulo; + +import com.thinkaurelius.faunus.tinkerpop.gremlin.Imports; +import junit.framework.TestCase; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class TitanAccumuloInputFormatTest extends TestCase { + + public void testInGremlinImports() { + assertTrue(Imports.getImports().contains(TitanAccumuloInputFormat.class.getPackage().getName() + ".*")); + } +} diff --git a/src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormatTest.java b/src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormatTest.java new file mode 100644 index 00000000..33f5074c --- /dev/null +++ b/src/test/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormatTest.java @@ -0,0 +1,15 @@ +package com.thinkaurelius.faunus.formats.titan.accumulo; + +import com.thinkaurelius.faunus.formats.titan.hbase.*; +import com.thinkaurelius.faunus.tinkerpop.gremlin.Imports; +import junit.framework.TestCase; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class TitanAccumuloOutputFormatTest extends TestCase { + + public void testInGremlinImports() { + assertTrue(Imports.getImports().contains(TitanAccumuloOutputFormat.class.getPackage().getName() + ".*")); + } +} From 973c4add4583bf049d4ffc90eb0787bf663dd9b3 Mon Sep 17 00:00:00 2001 From: Etienne Deprit Date: Fri, 14 Jun 2013 15:08:08 -0400 Subject: [PATCH 02/11] Restore erroneously modified HBase formats. --- .../accumulo/FaunusTitanAccumuloGraph.java | 17 ++++++----------- .../accumulo/TitanAccumuloInputFormat.java | 7 +++---- .../accumulo/TitanAccumuloRecordReader.java | 10 +++++----- 3 files changed, 14 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java index f5c86085..1a5220cf 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java @@ -1,6 +1,5 @@ package com.thinkaurelius.faunus.formats.titan.accumulo; -import com.thinkaurelius.faunus.formats.titan.hbase.*; import com.google.common.base.Preconditions; import com.thinkaurelius.faunus.FaunusVertex; import com.thinkaurelius.faunus.formats.titan.FaunusTitanGraph; @@ -19,7 +18,6 @@ /** * (c) Matthias Broecheler (me@matthiasb.com) */ - public class FaunusTitanAccumuloGraph extends FaunusTitanGraph { public FaunusTitanAccumuloGraph(final String configFile) throws ConfigurationException { @@ -31,30 +29,29 @@ public FaunusTitanAccumuloGraph(final Configuration configuration) { } public FaunusVertex readFaunusVertex(byte[] key, final NavigableMap> rowMap) { - return super.readFaunusVertex(ByteBuffer.wrap(key), new HBaseMapIterable(rowMap)); + return super.readFaunusVertex(ByteBuffer.wrap(key), new AccumuloMapIterable(rowMap)); } - private static class HBaseMapIterable implements Iterable { + private static class AccumuloMapIterable implements Iterable { private final NavigableMap> columnValues; - public HBaseMapIterable(final NavigableMap> columnValues) { + public AccumuloMapIterable(final NavigableMap> columnValues) { Preconditions.checkNotNull(columnValues); this.columnValues = columnValues; } @Override public Iterator iterator() { - return new HBaseMapIterator(columnValues.entrySet().iterator()); + return new AccumuloMapIterator(columnValues.entrySet().iterator()); } - } - private static class HBaseMapIterator implements Iterator { + private static class AccumuloMapIterator implements Iterator { private final Iterator>> iterator; - public HBaseMapIterator(final Iterator>> iterator) { + public AccumuloMapIterator(final Iterator>> iterator) { this.iterator = iterator; } @@ -74,6 +71,4 @@ public void remove() { throw new UnsupportedOperationException(); } } - } - diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java index c3dbb45a..e222d271 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java @@ -1,6 +1,5 @@ package com.thinkaurelius.faunus.formats.titan.accumulo; -import com.thinkaurelius.faunus.formats.titan.hbase.*; import com.thinkaurelius.faunus.FaunusVertex; import com.thinkaurelius.faunus.formats.VertexQueryFilter; import com.thinkaurelius.faunus.formats.titan.GraphFactory; @@ -35,7 +34,7 @@ public class TitanAccumuloInputFormat extends TitanInputFormat { static final byte[] EDGE_STORE_FAMILY = Bytes.toBytes(Backend.EDGESTORE_NAME); private final TableInputFormat tableInputFormat = new TableInputFormat(); - private FaunusTitanHBaseGraph graph; + private FaunusTitanAccumuloGraph graph; private VertexQueryFilter vertexQuery; private boolean pathEnabled; @@ -46,12 +45,12 @@ public List getSplits(final JobContext jobContext) throws IOExceptio @Override public RecordReader createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - return new TitanHBaseRecordReader(this.graph, this.vertexQuery, this.pathEnabled, (TableRecordReader) this.tableInputFormat.createRecordReader(inputSplit, taskAttemptContext)); + return new TitanAccumuloRecordReader(this.graph, this.vertexQuery, this.pathEnabled, (TableRecordReader) this.tableInputFormat.createRecordReader(inputSplit, taskAttemptContext)); } @Override public void setConf(final Configuration config) { - this.graph = new FaunusTitanHBaseGraph(GraphFactory.generateTitanConfiguration(config, FAUNUS_GRAPH_INPUT_TITAN)); + this.graph = new FaunusTitanAccumuloGraph(GraphFactory.generateTitanConfiguration(config, FAUNUS_GRAPH_INPUT_TITAN)); this.vertexQuery = VertexQueryFilter.create(config); this.pathEnabled = config.getBoolean(FaunusCompiler.PATH_ENABLED, false); diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java index 6c61bd73..e26512a9 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java @@ -1,6 +1,5 @@ package com.thinkaurelius.faunus.formats.titan.accumulo; -import com.thinkaurelius.faunus.formats.titan.hbase.*; import com.thinkaurelius.faunus.FaunusVertex; import com.thinkaurelius.faunus.formats.VertexQueryFilter; import org.apache.hadoop.hbase.mapreduce.TableRecordReader; @@ -17,13 +16,12 @@ public class TitanAccumuloRecordReader extends RecordReader { private TableRecordReader reader; - private FaunusTitanHBaseGraph graph; + private FaunusTitanAccumuloGraph graph; private VertexQueryFilter vertexQuery; private boolean pathEnabled; - private FaunusVertex vertex; - public TitanAccumuloRecordReader(final FaunusTitanHBaseGraph graph, final VertexQueryFilter vertexQuery, final boolean pathEnabled, final TableRecordReader reader) { + public TitanAccumuloRecordReader(final FaunusTitanAccumuloGraph graph, final VertexQueryFilter vertexQuery, final boolean pathEnabled, final TableRecordReader reader) { this.graph = graph; this.vertexQuery = vertexQuery; this.pathEnabled = pathEnabled; @@ -40,7 +38,9 @@ public boolean nextKeyValue() throws IOException, InterruptedException { while (this.reader.nextKeyValue()) { final FaunusVertex temp = this.graph.readFaunusVertex(this.reader.getCurrentKey().copyBytes(), this.reader.getCurrentValue().getMap().get(TitanAccumuloInputFormat.EDGE_STORE_FAMILY)); if (null != temp) { - if (this.pathEnabled) temp.enablePath(true); + if (this.pathEnabled) { + temp.enablePath(true); + } this.vertex = temp; this.vertexQuery.defaultFilter(this.vertex); return true; From 9638b1f3bdf2039d1e66ccec1d3560a8676ac895 Mon Sep 17 00:00:00 2001 From: Etienne Deprit Date: Fri, 14 Jun 2013 18:29:05 -0400 Subject: [PATCH 03/11] TitanAccumuloInputFormat not compiling. --- .../accumulo/TitanAccumuloInputFormat.java | 71 +++++++++++++------ .../accumulo/TitanAccumuloRecordReader.java | 17 +++-- 2 files changed, 60 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java index e222d271..343ca90a 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java @@ -4,16 +4,15 @@ import com.thinkaurelius.faunus.formats.VertexQueryFilter; import com.thinkaurelius.faunus.formats.titan.GraphFactory; import com.thinkaurelius.faunus.formats.titan.TitanInputFormat; +import static com.thinkaurelius.faunus.formats.titan.TitanInputFormat.FAUNUS_GRAPH_INPUT_TITAN_STORAGE_HOSTNAME; +import static com.thinkaurelius.faunus.formats.titan.TitanInputFormat.FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT; import com.thinkaurelius.faunus.mapreduce.FaunusCompiler; import com.thinkaurelius.titan.diskstorage.Backend; import com.thinkaurelius.titan.diskstorage.hbase.HBaseKeyColumnValueStore; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; -import org.apache.hadoop.hbase.mapreduce.TableRecordReader; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; @@ -24,48 +23,80 @@ import java.io.IOException; import java.lang.reflect.Method; import java.util.List; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.io.Text; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ public class TitanAccumuloInputFormat extends TitanInputFormat { - public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_TABLENAME = "faunus.graph.input.titan.storage.tablename"; + public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_TABLENAME = + "faunus.graph.input.titan.storage.tablename"; + public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_INSTANCE = + "faunus.graph.input.titan.storage.accumulo-config.instance"; + public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_USERNAME = + "faunus.graph.input.titan.storage.accumulo-config.username"; + public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_PASSWORD = + "faunus.graph.input.titan.storage.accumulo-config.password"; static final byte[] EDGE_STORE_FAMILY = Bytes.toBytes(Backend.EDGESTORE_NAME); - - private final TableInputFormat tableInputFormat = new TableInputFormat(); + private final AccumuloInputFormat accumuloInputFormat = new AccumuloInputFormat(); + private Configuration configuration; private FaunusTitanAccumuloGraph graph; private VertexQueryFilter vertexQuery; private boolean pathEnabled; @Override public List getSplits(final JobContext jobContext) throws IOException, InterruptedException { - return this.tableInputFormat.getSplits(jobContext); + return this.accumuloInputFormat.getSplits(jobContext); } @Override public RecordReader createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - return new TitanAccumuloRecordReader(this.graph, this.vertexQuery, this.pathEnabled, (TableRecordReader) this.tableInputFormat.createRecordReader(inputSplit, taskAttemptContext)); + return new TitanAccumuloRecordReader(graph, vertexQuery, pathEnabled, accumuloInputFormat.createRecordReader(inputSplit, taskAttemptContext)); } @Override public void setConf(final Configuration config) { - this.graph = new FaunusTitanAccumuloGraph(GraphFactory.generateTitanConfiguration(config, FAUNUS_GRAPH_INPUT_TITAN)); - this.vertexQuery = VertexQueryFilter.create(config); - this.pathEnabled = config.getBoolean(FaunusCompiler.PATH_ENABLED, false); + configuration = config; + graph = new FaunusTitanAccumuloGraph(GraphFactory.generateTitanConfiguration(config, FAUNUS_GRAPH_INPUT_TITAN)); + vertexQuery = VertexQueryFilter.create(config); + pathEnabled = config.getBoolean(FaunusCompiler.PATH_ENABLED, false); //config.set(TableInputFormat.SCAN_COLUMN_FAMILY, Backend.EDGESTORE_NAME); - config.set(TableInputFormat.INPUT_TABLE, config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_TABLENAME)); - config.set(HConstants.ZOOKEEPER_QUORUM, config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_HOSTNAME)); - if (config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT, null) != null) - config.set(HConstants.ZOOKEEPER_CLIENT_PORT, config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT)); + String tableName = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_TABLENAME); + config.set(TableInputFormat.INPUT_TABLE, tableName); + + String instance = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_INSTANCE); + + String zookeepers = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_HOSTNAME); + if (config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT, null) != null) { + zookeepers = StringUtils.join(zookeepers.split(","), + ":" + config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT)); + + } + + AccumuloInputFormat.setZooKeeperInstance(config, instance, zookeepers); + + String username = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_USERNAME); + String password = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_PASSWORD); + + AccumuloInputFormat.setInputInfo(config, username, password.getBytes(), tableName, new Authorizations()); + config.set("storage.read-only", "true"); config.set("autotype", "none"); - Scan scanner = new Scan(); - scanner.addFamily(Backend.EDGESTORE_NAME.getBytes()); + + Scanner scanner = new Scanner(); + + scanner.fetchColumnFamily(new Text(Backend.EDGESTORE_NAME.getBytes())); scanner.setFilter(getColumnFilter(this.vertexQuery)); //TODO (minor): should we set other options in http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Scan.html for optimization? Method converter; + + try { converter = TableMapReduceUtil.class.getDeclaredMethod("convertScanToString", Scan.class); converter.setAccessible(true); @@ -73,8 +104,6 @@ public void setConf(final Configuration config) { } catch (Exception e) { throw new RuntimeException(e); } - - this.tableInputFormat.setConf(config); } private Filter getColumnFilter(VertexQueryFilter inputFilter) { @@ -83,6 +112,6 @@ private Filter getColumnFilter(VertexQueryFilter inputFilter) { @Override public Configuration getConf() { - return tableInputFormat.getConf(); + return configuration; } -} +} \ No newline at end of file diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java index e26512a9..b1f0bffa 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java @@ -2,26 +2,29 @@ import com.thinkaurelius.faunus.FaunusVertex; import com.thinkaurelius.faunus.formats.VertexQueryFilter; -import org.apache.hadoop.hbase.mapreduce.TableRecordReader; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ public class TitanAccumuloRecordReader extends RecordReader { - private TableRecordReader reader; + private RecordReader reader; private FaunusTitanAccumuloGraph graph; private VertexQueryFilter vertexQuery; private boolean pathEnabled; private FaunusVertex vertex; - public TitanAccumuloRecordReader(final FaunusTitanAccumuloGraph graph, final VertexQueryFilter vertexQuery, final boolean pathEnabled, final TableRecordReader reader) { + public TitanAccumuloRecordReader(final FaunusTitanAccumuloGraph graph, + final VertexQueryFilter vertexQuery, final boolean pathEnabled, + final RecordReader reader) { this.graph = graph; this.vertexQuery = vertexQuery; this.pathEnabled = pathEnabled; @@ -30,13 +33,13 @@ public TitanAccumuloRecordReader(final FaunusTitanAccumuloGraph graph, final Ver @Override public void initialize(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - this.reader.initialize(inputSplit, taskAttemptContext); + reader.initialize(inputSplit, taskAttemptContext); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { - while (this.reader.nextKeyValue()) { - final FaunusVertex temp = this.graph.readFaunusVertex(this.reader.getCurrentKey().copyBytes(), this.reader.getCurrentValue().getMap().get(TitanAccumuloInputFormat.EDGE_STORE_FAMILY)); + while (reader.nextKeyValue()) { + final FaunusVertex temp = graph.readFaunusVertex(reader.getCurrentKey(), reader.getCurrentValue().getMap().get(TitanAccumuloInputFormat.EDGE_STORE_FAMILY)); if (null != temp) { if (this.pathEnabled) { temp.enablePath(true); @@ -66,7 +69,7 @@ public void close() throws IOException { } @Override - public float getProgress() { + public float getProgress() throws IOException, InterruptedException { return this.reader.getProgress(); } } From 641b7321dd23962bf55b21190b9f2d79b33e96da Mon Sep 17 00:00:00 2001 From: Etienne Deprit Date: Tue, 18 Jun 2013 10:01:04 -0400 Subject: [PATCH 04/11] Added column family range iterator to accumulo input format. --- .../accumulo/FaunusTitanAccumuloGraph.java | 28 ++-- .../accumulo/TitanAccumuloInputFormat.java | 80 +++++---- .../accumulo/TitanAccumuloRecordReader.java | 13 +- .../iterators/user/ColumnRangeFilter.java | 139 ++++++++++++++++ .../iterators/DefaultIteratorEnvironment.java | 69 ++++++++ .../iterators/user/ColumnRangeFilterTest.java | 157 ++++++++++++++++++ 6 files changed, 434 insertions(+), 52 deletions(-) create mode 100644 src/main/java/org/apache/accumulo/core/iterators/user/ColumnRangeFilter.java create mode 100644 src/test/java/org/apache/accumulo/core/iterators/DefaultIteratorEnvironment.java create mode 100644 src/test/java/org/apache/accumulo/core/iterators/user/ColumnRangeFilterTest.java diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java index 1a5220cf..05264ab4 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java @@ -9,11 +9,12 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; - import java.nio.ByteBuffer; import java.util.Iterator; import java.util.Map; -import java.util.NavigableMap; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.PeekingIterator; /** * (c) Matthias Broecheler (me@matthiasb.com) @@ -28,30 +29,30 @@ public FaunusTitanAccumuloGraph(final Configuration configuration) { super(configuration); } - public FaunusVertex readFaunusVertex(byte[] key, final NavigableMap> rowMap) { - return super.readFaunusVertex(ByteBuffer.wrap(key), new AccumuloMapIterable(rowMap)); + public FaunusVertex readFaunusVertex(byte[] key, final PeekingIterator> columnIterator) { + return super.readFaunusVertex(ByteBuffer.wrap(key), new AccumuloMapIterable(columnIterator)); } private static class AccumuloMapIterable implements Iterable { - private final NavigableMap> columnValues; + private final PeekingIterator> columnIterator; - public AccumuloMapIterable(final NavigableMap> columnValues) { - Preconditions.checkNotNull(columnValues); - this.columnValues = columnValues; + public AccumuloMapIterable(final PeekingIterator> columnIterator) { + Preconditions.checkNotNull(columnIterator); + this.columnIterator = columnIterator; } @Override public Iterator iterator() { - return new AccumuloMapIterator(columnValues.entrySet().iterator()); + return new AccumuloMapIterator(columnIterator); } } private static class AccumuloMapIterator implements Iterator { - private final Iterator>> iterator; + private final Iterator> iterator; - public AccumuloMapIterator(final Iterator>> iterator) { + public AccumuloMapIterator(final Iterator> iterator) { this.iterator = iterator; } @@ -62,8 +63,9 @@ public boolean hasNext() { @Override public Entry next() { - final Map.Entry> entry = iterator.next(); - return new StaticBufferEntry(new StaticByteBuffer(entry.getKey()), new StaticByteBuffer(entry.getValue().lastEntry().getValue())); + final Map.Entry entry = iterator.next(); + return new StaticBufferEntry(new StaticByteBuffer(entry.getKey().getColumnQualifier().getBytes()), + new StaticByteBuffer(entry.getValue().get())); } @Override diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java index 343ca90a..c51b4bdb 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java @@ -8,32 +8,33 @@ import static com.thinkaurelius.faunus.formats.titan.TitanInputFormat.FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT; import com.thinkaurelius.faunus.mapreduce.FaunusCompiler; import com.thinkaurelius.titan.diskstorage.Backend; -import com.thinkaurelius.titan.diskstorage.hbase.HBaseKeyColumnValueStore; +import com.thinkaurelius.titan.diskstorage.StaticBuffer; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.SliceQuery; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; - import java.io.IOException; -import java.lang.reflect.Method; +import java.util.Collection; +import java.util.Collections; import java.util.List; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat; +import org.apache.accumulo.core.iterators.user.ColumnRangeFilter; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.Pair; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ public class TitanAccumuloInputFormat extends TitanInputFormat { + protected final Logger logger = Logger.getLogger(FaunusCompiler.class); public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_TABLENAME = "faunus.graph.input.titan.storage.tablename"; public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_INSTANCE = @@ -42,8 +43,7 @@ public class TitanAccumuloInputFormat extends TitanInputFormat { "faunus.graph.input.titan.storage.accumulo-config.username"; public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_PASSWORD = "faunus.graph.input.titan.storage.accumulo-config.password"; - static final byte[] EDGE_STORE_FAMILY = Bytes.toBytes(Backend.EDGESTORE_NAME); - private final AccumuloInputFormat accumuloInputFormat = new AccumuloInputFormat(); + private final AccumuloRowInputFormat accumuloRowInputFormat = new AccumuloRowInputFormat(); private Configuration configuration; private FaunusTitanAccumuloGraph graph; private VertexQueryFilter vertexQuery; @@ -51,12 +51,12 @@ public class TitanAccumuloInputFormat extends TitanInputFormat { @Override public List getSplits(final JobContext jobContext) throws IOException, InterruptedException { - return this.accumuloInputFormat.getSplits(jobContext); + return accumuloRowInputFormat.getSplits(jobContext); } @Override public RecordReader createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - return new TitanAccumuloRecordReader(graph, vertexQuery, pathEnabled, accumuloInputFormat.createRecordReader(inputSplit, taskAttemptContext)); + return new TitanAccumuloRecordReader(graph, vertexQuery, pathEnabled, accumuloRowInputFormat.createRecordReader(inputSplit, taskAttemptContext)); } @Override @@ -66,48 +66,60 @@ public void setConf(final Configuration config) { vertexQuery = VertexQueryFilter.create(config); pathEnabled = config.getBoolean(FaunusCompiler.PATH_ENABLED, false); - //config.set(TableInputFormat.SCAN_COLUMN_FAMILY, Backend.EDGESTORE_NAME); - String tableName = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_TABLENAME); - config.set(TableInputFormat.INPUT_TABLE, tableName); - String instance = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_INSTANCE); String zookeepers = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_HOSTNAME); + if (config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT, null) != null) { zookeepers = StringUtils.join(zookeepers.split(","), ":" + config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT)); - } - AccumuloInputFormat.setZooKeeperInstance(config, instance, zookeepers); + try { + AccumuloRowInputFormat.setZooKeeperInstance(config, instance, zookeepers); + } catch (IllegalStateException ex) { + logger.info(ex.getMessage()); + } String username = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_USERNAME); String password = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_PASSWORD); - AccumuloInputFormat.setInputInfo(config, username, password.getBytes(), tableName, new Authorizations()); + String tableName = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_TABLENAME); + + try { + AccumuloRowInputFormat.setInputInfo(config, username, password.getBytes(), tableName, new Authorizations()); + } catch (IllegalStateException ex) { + logger.info(ex.getMessage()); + } config.set("storage.read-only", "true"); config.set("autotype", "none"); - Scanner scanner = new Scanner(); - - scanner.fetchColumnFamily(new Text(Backend.EDGESTORE_NAME.getBytes())); - scanner.setFilter(getColumnFilter(this.vertexQuery)); - //TODO (minor): should we set other options in http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Scan.html for optimization? - Method converter; + Collection> columnFamilyColumnQualifierPairs = + Collections.singletonList(new Pair(new Text(Backend.EDGESTORE_NAME), null)); + AccumuloRowInputFormat.fetchColumns(config, columnFamilyColumnQualifierPairs); - try { - converter = TableMapReduceUtil.class.getDeclaredMethod("convertScanToString", Scan.class); - converter.setAccessible(true); - config.set(TableInputFormat.SCAN, (String) converter.invoke(null, scanner)); - } catch (Exception e) { - throw new RuntimeException(e); + IteratorSetting is = getColumnFilter(vertexQuery); + if (is != null) { + AccumuloRowInputFormat.addIterator(config, getColumnFilter(vertexQuery)); + AccumuloRowInputFormat.setLocalIterators(config, true); } } - private Filter getColumnFilter(VertexQueryFilter inputFilter) { - return HBaseKeyColumnValueStore.getFilter(TitanInputFormat.inputSlice(inputFilter, graph)); + private IteratorSetting getColumnFilter(VertexQueryFilter inputFilter) { + IteratorSetting is = null; + + SliceQuery sliceQuery = TitanInputFormat.inputSlice(vertexQuery, graph); + byte[] minColumn = sliceQuery.getSliceStart().as(StaticBuffer.ARRAY_FACTORY); + byte[] maxColumn = sliceQuery.getSliceEnd().as(StaticBuffer.ARRAY_FACTORY); + + if (minColumn.length > 0 || maxColumn.length > 0) { + is = new IteratorSetting(20, "colrange", ColumnRangeFilter.class); + ColumnRangeFilter.setRange(is, minColumn, true, maxColumn, false); + } + + return is; } @Override diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java index b1f0bffa..3027d0bd 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java @@ -8,15 +8,18 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; +import java.util.Map; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.PeekingIterator; +import org.apache.hadoop.io.Text; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ public class TitanAccumuloRecordReader extends RecordReader { - private RecordReader reader; + private RecordReader>> reader; private FaunusTitanAccumuloGraph graph; private VertexQueryFilter vertexQuery; private boolean pathEnabled; @@ -24,7 +27,7 @@ public class TitanAccumuloRecordReader extends RecordReader reader) { + final RecordReader>> reader) { this.graph = graph; this.vertexQuery = vertexQuery; this.pathEnabled = pathEnabled; @@ -39,13 +42,13 @@ public void initialize(final InputSplit inputSplit, final TaskAttemptContext tas @Override public boolean nextKeyValue() throws IOException, InterruptedException { while (reader.nextKeyValue()) { - final FaunusVertex temp = graph.readFaunusVertex(reader.getCurrentKey(), reader.getCurrentValue().getMap().get(TitanAccumuloInputFormat.EDGE_STORE_FAMILY)); + final FaunusVertex temp = graph.readFaunusVertex(reader.getCurrentKey().getBytes(), reader.getCurrentValue()); if (null != temp) { if (this.pathEnabled) { temp.enablePath(true); } - this.vertex = temp; - this.vertexQuery.defaultFilter(this.vertex); + vertex = temp; + vertexQuery.defaultFilter(this.vertex); return true; } } diff --git a/src/main/java/org/apache/accumulo/core/iterators/user/ColumnRangeFilter.java b/src/main/java/org/apache/accumulo/core/iterators/user/ColumnRangeFilter.java new file mode 100644 index 00000000..b2f2aaae --- /dev/null +++ b/src/main/java/org/apache/accumulo/core/iterators/user/ColumnRangeFilter.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.iterators.user; + + +import java.io.IOException; +import java.util.Map; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.Filter; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; + +/** + * A Filter that matches entries based on Java regular expressions. + */ +public class ColumnRangeFilter extends Filter { + + @Override + public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { + ColumnRangeFilter result = (ColumnRangeFilter) super.deepCopy(env); + result.minColumn = minColumn; + result.minColInclusive = minColInclusive; + result.maxColumn = maxColumn; + result.maxColInclusive = maxColInclusive; + return result; + } + public static final String MIN_COLUMN = "minColumn"; + public static final String MIN_COL_INCLUSIVE = "minColInclusive"; + public static final String MAX_COLUMN = "maxColumn"; + public static final String MAX_COL_INCLUSIVE = "maxColInclusive"; + private ByteSequence minColumn; + private boolean minColInclusive; + private ByteSequence maxColumn; + private boolean maxColInclusive; + + @Override + public boolean accept(Key key, Value value) { + int cmpMin = -1; + + if (minColumn != null) { + cmpMin = minColumn.compareTo(key.getColumnQualifierData()); + } + + if (cmpMin > 0 || (!minColInclusive && cmpMin == 0)) { + return false; + } + + if (maxColumn == null) { + return true; + } + + int cmpMax = maxColumn.compareTo(key.getColumnQualifierData()); + + if (cmpMax < 0 || (!maxColInclusive && cmpMax == 0)) { + return false; + } + + return true; + } + + @Override + public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { + super.init(source, options, env); + if (options.containsKey(MIN_COLUMN)) { + minColumn = new ArrayByteSequence(options.get(MIN_COLUMN)); + } else { + minColumn = null; + } + + if (options.containsKey(MIN_COL_INCLUSIVE)) { + minColInclusive = Boolean.parseBoolean(options.get(MIN_COL_INCLUSIVE)); + } else { + minColInclusive = true; + } + + if (options.containsKey(MAX_COLUMN)) { + maxColumn = new ArrayByteSequence(options.get(MAX_COLUMN)); + } else { + maxColumn = null; + } + + if (options.containsKey(MAX_COL_INCLUSIVE)) { + maxColInclusive = Boolean.parseBoolean(options.get(MAX_COL_INCLUSIVE)); + } else { + maxColInclusive = false; + } + } + + @Override + public IteratorOptions describeOptions() { + IteratorOptions io = super.describeOptions(); + io.setName("colrange"); + io.setDescription("The ColumnRangeFilter/Iterator allows you to filter for a range of column qualifiers"); + io.addNamedOption(MIN_COLUMN, "mininum column qualifier"); + io.addNamedOption(MIN_COL_INCLUSIVE, "minimum column inclusive"); + io.addNamedOption(MAX_COLUMN, "maximum column qualifier"); + io.addNamedOption(MAX_COL_INCLUSIVE, "maximum column inclusive"); + return io; + } + + public static void setRange(IteratorSetting is, String minColumn, boolean minColInclusive, + String maxColumn, boolean maxColInclusive) { + if (minColumn != null && minColumn.length() > 0) { + is.addOption(ColumnRangeFilter.MIN_COLUMN, minColumn); + } + if (!minColInclusive) { + is.addOption(ColumnRangeFilter.MIN_COL_INCLUSIVE, "false"); + } + if (maxColumn != null && maxColumn.length() > 0) { + is.addOption(ColumnRangeFilter.MAX_COLUMN, maxColumn); + } + if (maxColInclusive) { + is.addOption(ColumnRangeFilter.MAX_COL_INCLUSIVE, "true"); + } + } + + public static void setRange(IteratorSetting is, byte[] minColumn, boolean minColInclusive, + byte[] maxColumn, boolean maxColInclusive) { + setRange(is, new String(minColumn), minColInclusive, new String(maxColumn), maxColInclusive); + } +} diff --git a/src/test/java/org/apache/accumulo/core/iterators/DefaultIteratorEnvironment.java b/src/test/java/org/apache/accumulo/core/iterators/DefaultIteratorEnvironment.java new file mode 100644 index 00000000..ed5ce0b9 --- /dev/null +++ b/src/test/java/org/apache/accumulo/core/iterators/DefaultIteratorEnvironment.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.iterators; + +import java.io.IOException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.map.MyMapFile; +import org.apache.accumulo.core.file.map.MyMapFile.Reader; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; + +@SuppressWarnings("deprecation") +public class DefaultIteratorEnvironment implements IteratorEnvironment { + + AccumuloConfiguration conf; + + public DefaultIteratorEnvironment(AccumuloConfiguration conf) { + this.conf = conf; + } + + public DefaultIteratorEnvironment() { + this.conf = AccumuloConfiguration.getDefaultConfiguration(); + } + + @Override + public Reader reserveMapFileReader(String mapFileName) throws IOException { + Configuration conf = CachedConfiguration.getInstance(); + FileSystem fs = FileSystem.get(conf); + return new MyMapFile.Reader(fs, mapFileName, conf); + } + + @Override + public AccumuloConfiguration getConfig() { + return conf; + } + + @Override + public IteratorScope getIteratorScope() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isFullMajorCompaction() { + throw new UnsupportedOperationException(); + } + + @Override + public void registerSideChannel(SortedKeyValueIterator iter) { + throw new UnsupportedOperationException(); + } +} diff --git a/src/test/java/org/apache/accumulo/core/iterators/user/ColumnRangeFilterTest.java b/src/test/java/org/apache/accumulo/core/iterators/user/ColumnRangeFilterTest.java new file mode 100644 index 00000000..6783f450 --- /dev/null +++ b/src/test/java/org/apache/accumulo/core/iterators/user/ColumnRangeFilterTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.iterators.user; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.TreeMap; + +import junit.framework.TestCase; +import static junit.framework.TestCase.assertFalse; +import static junit.framework.TestCase.assertTrue; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.DefaultIteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedMapIterator; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class ColumnRangeFilterTest extends TestCase { + + private static final Collection EMPTY_COL_FAMS = new ArrayList(); + + private Key nkv(TreeMap tm, String row, String cf, String cq, String val) { + Key k = nk(row, cf, cq); + tm.put(k, new Value(val.getBytes())); + return k; + } + + private Key nk(String row, String cf, String cq) { + return new Key(new Text(row), new Text(cf), new Text(cq)); + } + + @Test + public void test1() throws IOException { + TreeMap tm = new TreeMap(); + + Key k1 = nkv(tm, "row1", "cf1", "a", "x"); + Key k2 = nkv(tm, "row1", "cf1", "b", "y"); + Key k3 = nkv(tm, "row1", "cf2", "c", "z"); + + ColumnRangeFilter cri = new ColumnRangeFilter(); + cri.describeOptions(); + + IteratorSetting is = new IteratorSetting(1, ColumnRangeFilter.class); + ColumnRangeFilter.setRange(is, (String) null, false, (String) null, false); + assertTrue(cri.validateOptions(is.getOptions())); + cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment()); + cri.deepCopy(new DefaultIteratorEnvironment()); + cri.seek(new Range(), EMPTY_COL_FAMS, false); + + assertTrue(cri.hasTop()); + assertTrue(cri.getTopKey().equals(k1)); + cri.next(); + cri.next(); + assertTrue(cri.hasTop()); + + // ----------------------------------------------------- + is.clearOptions(); + ColumnRangeFilter.setRange(is, "c", false, null, false); + assertTrue(cri.validateOptions(is.getOptions())); + cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment()); + cri.seek(new Range(), EMPTY_COL_FAMS, false); + + assertFalse(cri.hasTop()); + + // ----------------------------------------------------- + is.clearOptions(); + ColumnRangeFilter.setRange(is, "b", false, null, false); + assertTrue(cri.validateOptions(is.getOptions())); + cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment()); + cri.seek(new Range(), EMPTY_COL_FAMS, false); + + assertTrue(cri.hasTop()); + assertTrue(cri.getTopKey().equals(k3)); + cri.next(); + assertFalse(cri.hasTop()); + + // ----------------------------------------------------- + is.clearOptions(); + ColumnRangeFilter.setRange(is, "b", true, null, false); + assertTrue(cri.validateOptions(is.getOptions())); + cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment()); + cri.seek(new Range(), EMPTY_COL_FAMS, false); + + assertTrue(cri.hasTop()); + assertTrue(cri.getTopKey().equals(k2)); + cri.next(); + assertTrue(cri.hasTop()); + + // ----------------------------------------------------- + is.clearOptions(); + ColumnRangeFilter.setRange(is, null, false, "b", false); + assertTrue(cri.validateOptions(is.getOptions())); + cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment()); + cri.seek(new Range(), EMPTY_COL_FAMS, false); + + assertTrue(cri.hasTop()); + assertTrue(cri.getTopKey().equals(k1)); + cri.next(); + assertFalse(cri.hasTop()); + + // ----------------------------------------------------- + is.clearOptions(); + ColumnRangeFilter.setRange(is, null, false, "b", true); + assertTrue(cri.validateOptions(is.getOptions())); + cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment()); + cri.seek(new Range(), EMPTY_COL_FAMS, false); + + assertTrue(cri.hasTop()); + assertTrue(cri.getTopKey().equals(k1)); + cri.next(); + assertTrue(cri.hasTop()); + + // ----------------------------------------------------- + is.clearOptions(); + ColumnRangeFilter.setRange(is, "b", true, "c", false); + assertTrue(cri.validateOptions(is.getOptions())); + cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment()); + cri.seek(new Range(), EMPTY_COL_FAMS, false); + + assertTrue(cri.hasTop()); + assertTrue(cri.getTopKey().equals(k2)); + cri.next(); + assertFalse(cri.hasTop()); + + // ----------------------------------------------------- + is.clearOptions(); + ColumnRangeFilter.setRange(is, "b", true, "c", true); + assertTrue(cri.validateOptions(is.getOptions())); + cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment()); + cri.seek(new Range(), EMPTY_COL_FAMS, false); + + assertTrue(cri.hasTop()); + assertTrue(cri.getTopKey().equals(k2)); + cri.next(); + assertTrue(cri.hasTop()); + } +} \ No newline at end of file From ab4610dfb7067f54bd3dbdf2d9b254bc123e1801 Mon Sep 17 00:00:00 2001 From: Etienne Deprit Date: Tue, 18 Jun 2013 14:11:56 -0400 Subject: [PATCH 05/11] Remove read-only storage configuration. --- .../formats/titan/accumulo/TitanAccumuloInputFormat.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java index c51b4bdb..6f5160cc 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java @@ -92,8 +92,7 @@ public void setConf(final Configuration config) { logger.info(ex.getMessage()); } - config.set("storage.read-only", "true"); - config.set("autotype", "none"); + // config.set("autotype", "none"); Collection> columnFamilyColumnQualifierPairs = Collections.singletonList(new Pair(new Text(Backend.EDGESTORE_NAME), null)); From e73de9b90a96e4a9096ec9b8b87e1da6336cdd2d Mon Sep 17 00:00:00 2001 From: Etienne Deprit Date: Sun, 15 Sep 2013 13:52:31 -0400 Subject: [PATCH 06/11] Remove redundant getColumnFilter call in TitanAccumuloInputFormat. --- .../faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java index 6f5160cc..c1b1ef04 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java @@ -101,7 +101,7 @@ public void setConf(final Configuration config) { IteratorSetting is = getColumnFilter(vertexQuery); if (is != null) { - AccumuloRowInputFormat.addIterator(config, getColumnFilter(vertexQuery)); + AccumuloRowInputFormat.addIterator(config, is); AccumuloRowInputFormat.setLocalIterators(config, true); } } From f718655fe5680e8b09e6a8a046a9fb7034d5a0ca Mon Sep 17 00:00:00 2001 From: Etienne Deprit Date: Thu, 26 Sep 2013 11:52:58 -0400 Subject: [PATCH 07/11] First version 0.4.0 support for Titan on Accumulo. --- pom.xml | 2 +- .../accumulo/TitanAccumuloInputFormat.java | 2 +- .../iterators/user/ColumnRangeFilter.java | 139 ---------------- .../iterators/DefaultIteratorEnvironment.java | 69 -------- .../iterators/user/ColumnRangeFilterTest.java | 157 ------------------ 5 files changed, 2 insertions(+), 367 deletions(-) delete mode 100644 src/main/java/org/apache/accumulo/core/iterators/user/ColumnRangeFilter.java delete mode 100644 src/test/java/org/apache/accumulo/core/iterators/DefaultIteratorEnvironment.java delete mode 100644 src/test/java/org/apache/accumulo/core/iterators/user/ColumnRangeFilterTest.java diff --git a/pom.xml b/pom.xml index bb9a6267..f3d55749 100644 --- a/pom.xml +++ b/pom.xml @@ -97,7 +97,7 @@ com.thinkaurelius.titan - titan-accumulo + titan-accumulo-core ${titan.version} diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java index c1b1ef04..519f64e0 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java @@ -9,6 +9,7 @@ import com.thinkaurelius.faunus.mapreduce.FaunusCompiler; import com.thinkaurelius.titan.diskstorage.Backend; import com.thinkaurelius.titan.diskstorage.StaticBuffer; +import com.thinkaurelius.titan.diskstorage.accumulo.iterators.ColumnRangeFilter; import com.thinkaurelius.titan.diskstorage.keycolumnvalue.SliceQuery; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; @@ -22,7 +23,6 @@ import java.util.List; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat; -import org.apache.accumulo.core.iterators.user.ColumnRangeFilter; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.Pair; import org.apache.commons.lang.StringUtils; diff --git a/src/main/java/org/apache/accumulo/core/iterators/user/ColumnRangeFilter.java b/src/main/java/org/apache/accumulo/core/iterators/user/ColumnRangeFilter.java deleted file mode 100644 index b2f2aaae..00000000 --- a/src/main/java/org/apache/accumulo/core/iterators/user/ColumnRangeFilter.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.iterators.user; - - -import java.io.IOException; -import java.util.Map; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.data.ArrayByteSequence; -import org.apache.accumulo.core.data.ByteSequence; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.Filter; -import org.apache.accumulo.core.iterators.IteratorEnvironment; -import org.apache.accumulo.core.iterators.SortedKeyValueIterator; - -/** - * A Filter that matches entries based on Java regular expressions. - */ -public class ColumnRangeFilter extends Filter { - - @Override - public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { - ColumnRangeFilter result = (ColumnRangeFilter) super.deepCopy(env); - result.minColumn = minColumn; - result.minColInclusive = minColInclusive; - result.maxColumn = maxColumn; - result.maxColInclusive = maxColInclusive; - return result; - } - public static final String MIN_COLUMN = "minColumn"; - public static final String MIN_COL_INCLUSIVE = "minColInclusive"; - public static final String MAX_COLUMN = "maxColumn"; - public static final String MAX_COL_INCLUSIVE = "maxColInclusive"; - private ByteSequence minColumn; - private boolean minColInclusive; - private ByteSequence maxColumn; - private boolean maxColInclusive; - - @Override - public boolean accept(Key key, Value value) { - int cmpMin = -1; - - if (minColumn != null) { - cmpMin = minColumn.compareTo(key.getColumnQualifierData()); - } - - if (cmpMin > 0 || (!minColInclusive && cmpMin == 0)) { - return false; - } - - if (maxColumn == null) { - return true; - } - - int cmpMax = maxColumn.compareTo(key.getColumnQualifierData()); - - if (cmpMax < 0 || (!maxColInclusive && cmpMax == 0)) { - return false; - } - - return true; - } - - @Override - public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { - super.init(source, options, env); - if (options.containsKey(MIN_COLUMN)) { - minColumn = new ArrayByteSequence(options.get(MIN_COLUMN)); - } else { - minColumn = null; - } - - if (options.containsKey(MIN_COL_INCLUSIVE)) { - minColInclusive = Boolean.parseBoolean(options.get(MIN_COL_INCLUSIVE)); - } else { - minColInclusive = true; - } - - if (options.containsKey(MAX_COLUMN)) { - maxColumn = new ArrayByteSequence(options.get(MAX_COLUMN)); - } else { - maxColumn = null; - } - - if (options.containsKey(MAX_COL_INCLUSIVE)) { - maxColInclusive = Boolean.parseBoolean(options.get(MAX_COL_INCLUSIVE)); - } else { - maxColInclusive = false; - } - } - - @Override - public IteratorOptions describeOptions() { - IteratorOptions io = super.describeOptions(); - io.setName("colrange"); - io.setDescription("The ColumnRangeFilter/Iterator allows you to filter for a range of column qualifiers"); - io.addNamedOption(MIN_COLUMN, "mininum column qualifier"); - io.addNamedOption(MIN_COL_INCLUSIVE, "minimum column inclusive"); - io.addNamedOption(MAX_COLUMN, "maximum column qualifier"); - io.addNamedOption(MAX_COL_INCLUSIVE, "maximum column inclusive"); - return io; - } - - public static void setRange(IteratorSetting is, String minColumn, boolean minColInclusive, - String maxColumn, boolean maxColInclusive) { - if (minColumn != null && minColumn.length() > 0) { - is.addOption(ColumnRangeFilter.MIN_COLUMN, minColumn); - } - if (!minColInclusive) { - is.addOption(ColumnRangeFilter.MIN_COL_INCLUSIVE, "false"); - } - if (maxColumn != null && maxColumn.length() > 0) { - is.addOption(ColumnRangeFilter.MAX_COLUMN, maxColumn); - } - if (maxColInclusive) { - is.addOption(ColumnRangeFilter.MAX_COL_INCLUSIVE, "true"); - } - } - - public static void setRange(IteratorSetting is, byte[] minColumn, boolean minColInclusive, - byte[] maxColumn, boolean maxColInclusive) { - setRange(is, new String(minColumn), minColInclusive, new String(maxColumn), maxColInclusive); - } -} diff --git a/src/test/java/org/apache/accumulo/core/iterators/DefaultIteratorEnvironment.java b/src/test/java/org/apache/accumulo/core/iterators/DefaultIteratorEnvironment.java deleted file mode 100644 index ed5ce0b9..00000000 --- a/src/test/java/org/apache/accumulo/core/iterators/DefaultIteratorEnvironment.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.iterators; - -import java.io.IOException; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.file.map.MyMapFile; -import org.apache.accumulo.core.file.map.MyMapFile.Reader; -import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; -import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; - -@SuppressWarnings("deprecation") -public class DefaultIteratorEnvironment implements IteratorEnvironment { - - AccumuloConfiguration conf; - - public DefaultIteratorEnvironment(AccumuloConfiguration conf) { - this.conf = conf; - } - - public DefaultIteratorEnvironment() { - this.conf = AccumuloConfiguration.getDefaultConfiguration(); - } - - @Override - public Reader reserveMapFileReader(String mapFileName) throws IOException { - Configuration conf = CachedConfiguration.getInstance(); - FileSystem fs = FileSystem.get(conf); - return new MyMapFile.Reader(fs, mapFileName, conf); - } - - @Override - public AccumuloConfiguration getConfig() { - return conf; - } - - @Override - public IteratorScope getIteratorScope() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isFullMajorCompaction() { - throw new UnsupportedOperationException(); - } - - @Override - public void registerSideChannel(SortedKeyValueIterator iter) { - throw new UnsupportedOperationException(); - } -} diff --git a/src/test/java/org/apache/accumulo/core/iterators/user/ColumnRangeFilterTest.java b/src/test/java/org/apache/accumulo/core/iterators/user/ColumnRangeFilterTest.java deleted file mode 100644 index 6783f450..00000000 --- a/src/test/java/org/apache/accumulo/core/iterators/user/ColumnRangeFilterTest.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.iterators.user; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.TreeMap; - -import junit.framework.TestCase; -import static junit.framework.TestCase.assertFalse; -import static junit.framework.TestCase.assertTrue; - -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.data.ByteSequence; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.DefaultIteratorEnvironment; -import org.apache.accumulo.core.iterators.SortedMapIterator; -import org.apache.hadoop.io.Text; -import org.junit.Test; - -public class ColumnRangeFilterTest extends TestCase { - - private static final Collection EMPTY_COL_FAMS = new ArrayList(); - - private Key nkv(TreeMap tm, String row, String cf, String cq, String val) { - Key k = nk(row, cf, cq); - tm.put(k, new Value(val.getBytes())); - return k; - } - - private Key nk(String row, String cf, String cq) { - return new Key(new Text(row), new Text(cf), new Text(cq)); - } - - @Test - public void test1() throws IOException { - TreeMap tm = new TreeMap(); - - Key k1 = nkv(tm, "row1", "cf1", "a", "x"); - Key k2 = nkv(tm, "row1", "cf1", "b", "y"); - Key k3 = nkv(tm, "row1", "cf2", "c", "z"); - - ColumnRangeFilter cri = new ColumnRangeFilter(); - cri.describeOptions(); - - IteratorSetting is = new IteratorSetting(1, ColumnRangeFilter.class); - ColumnRangeFilter.setRange(is, (String) null, false, (String) null, false); - assertTrue(cri.validateOptions(is.getOptions())); - cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment()); - cri.deepCopy(new DefaultIteratorEnvironment()); - cri.seek(new Range(), EMPTY_COL_FAMS, false); - - assertTrue(cri.hasTop()); - assertTrue(cri.getTopKey().equals(k1)); - cri.next(); - cri.next(); - assertTrue(cri.hasTop()); - - // ----------------------------------------------------- - is.clearOptions(); - ColumnRangeFilter.setRange(is, "c", false, null, false); - assertTrue(cri.validateOptions(is.getOptions())); - cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment()); - cri.seek(new Range(), EMPTY_COL_FAMS, false); - - assertFalse(cri.hasTop()); - - // ----------------------------------------------------- - is.clearOptions(); - ColumnRangeFilter.setRange(is, "b", false, null, false); - assertTrue(cri.validateOptions(is.getOptions())); - cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment()); - cri.seek(new Range(), EMPTY_COL_FAMS, false); - - assertTrue(cri.hasTop()); - assertTrue(cri.getTopKey().equals(k3)); - cri.next(); - assertFalse(cri.hasTop()); - - // ----------------------------------------------------- - is.clearOptions(); - ColumnRangeFilter.setRange(is, "b", true, null, false); - assertTrue(cri.validateOptions(is.getOptions())); - cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment()); - cri.seek(new Range(), EMPTY_COL_FAMS, false); - - assertTrue(cri.hasTop()); - assertTrue(cri.getTopKey().equals(k2)); - cri.next(); - assertTrue(cri.hasTop()); - - // ----------------------------------------------------- - is.clearOptions(); - ColumnRangeFilter.setRange(is, null, false, "b", false); - assertTrue(cri.validateOptions(is.getOptions())); - cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment()); - cri.seek(new Range(), EMPTY_COL_FAMS, false); - - assertTrue(cri.hasTop()); - assertTrue(cri.getTopKey().equals(k1)); - cri.next(); - assertFalse(cri.hasTop()); - - // ----------------------------------------------------- - is.clearOptions(); - ColumnRangeFilter.setRange(is, null, false, "b", true); - assertTrue(cri.validateOptions(is.getOptions())); - cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment()); - cri.seek(new Range(), EMPTY_COL_FAMS, false); - - assertTrue(cri.hasTop()); - assertTrue(cri.getTopKey().equals(k1)); - cri.next(); - assertTrue(cri.hasTop()); - - // ----------------------------------------------------- - is.clearOptions(); - ColumnRangeFilter.setRange(is, "b", true, "c", false); - assertTrue(cri.validateOptions(is.getOptions())); - cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment()); - cri.seek(new Range(), EMPTY_COL_FAMS, false); - - assertTrue(cri.hasTop()); - assertTrue(cri.getTopKey().equals(k2)); - cri.next(); - assertFalse(cri.hasTop()); - - // ----------------------------------------------------- - is.clearOptions(); - ColumnRangeFilter.setRange(is, "b", true, "c", true); - assertTrue(cri.validateOptions(is.getOptions())); - cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment()); - cri.seek(new Range(), EMPTY_COL_FAMS, false); - - assertTrue(cri.hasTop()); - assertTrue(cri.getTopKey().equals(k2)); - cri.next(); - assertTrue(cri.hasTop()); - } -} \ No newline at end of file From 1ee9d9b48a0094386ec13a66f85db42f32a510c2 Mon Sep 17 00:00:00 2001 From: Etienne Deprit Date: Thu, 26 Sep 2013 16:26:00 -0400 Subject: [PATCH 08/11] Moved ColumnRangeFilter iterator to titan-accumulo-iterators, general code cleanup. --- .../accumulo/FaunusTitanAccumuloGraph.java | 15 +++-- .../accumulo/TitanAccumuloInputFormat.java | 55 +++++++------------ .../accumulo/TitanAccumuloOutputFormat.java | 2 +- .../accumulo/TitanAccumuloRecordReader.java | 10 ++-- 4 files changed, 33 insertions(+), 49 deletions(-) diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java index 05264ab4..fa760e86 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/FaunusTitanAccumuloGraph.java @@ -6,18 +6,17 @@ import com.thinkaurelius.titan.diskstorage.keycolumnvalue.Entry; import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StaticBufferEntry; import com.thinkaurelius.titan.diskstorage.util.StaticByteBuffer; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.PropertiesConfiguration; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.Map; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.PeekingIterator; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; /** - * (c) Matthias Broecheler (me@matthiasb.com) + * @author Etienne Deprit */ public class FaunusTitanAccumuloGraph extends FaunusTitanGraph { @@ -29,15 +28,15 @@ public FaunusTitanAccumuloGraph(final Configuration configuration) { super(configuration); } - public FaunusVertex readFaunusVertex(byte[] key, final PeekingIterator> columnIterator) { + public FaunusVertex readFaunusVertex(byte[] key, final Iterator> columnIterator) { return super.readFaunusVertex(ByteBuffer.wrap(key), new AccumuloMapIterable(columnIterator)); } private static class AccumuloMapIterable implements Iterable { - private final PeekingIterator> columnIterator; + private final Iterator> columnIterator; - public AccumuloMapIterable(final PeekingIterator> columnIterator) { + public AccumuloMapIterable(final Iterator> columnIterator) { Preconditions.checkNotNull(columnIterator); this.columnIterator = columnIterator; } diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java index 519f64e0..6d56e315 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java @@ -8,8 +8,7 @@ import static com.thinkaurelius.faunus.formats.titan.TitanInputFormat.FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT; import com.thinkaurelius.faunus.mapreduce.FaunusCompiler; import com.thinkaurelius.titan.diskstorage.Backend; -import com.thinkaurelius.titan.diskstorage.StaticBuffer; -import com.thinkaurelius.titan.diskstorage.accumulo.iterators.ColumnRangeFilter; +import com.thinkaurelius.titan.diskstorage.accumulo.AccumuloKeyColumnValueStore; import com.thinkaurelius.titan.diskstorage.keycolumnvalue.SliceQuery; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; @@ -27,14 +26,13 @@ import org.apache.accumulo.core.util.Pair; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; /** - * @author Marko A. Rodriguez (http://markorodriguez.com) + * @author Etienne Deprit */ public class TitanAccumuloInputFormat extends TitanInputFormat { - protected final Logger logger = Logger.getLogger(FaunusCompiler.class); + // Accumulo store manager configuration public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_TABLENAME = "faunus.graph.input.titan.storage.tablename"; public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_INSTANCE = @@ -43,6 +41,9 @@ public class TitanAccumuloInputFormat extends TitanInputFormat { "faunus.graph.input.titan.storage.accumulo-config.username"; public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_PASSWORD = "faunus.graph.input.titan.storage.accumulo-config.password"; + public static final String FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_SERVER_ITERATORS = + "faunus.graph.input.titan.storage.accumulo-config.server-side-iterators"; + // Instance variables private final AccumuloRowInputFormat accumuloRowInputFormat = new AccumuloRowInputFormat(); private Configuration configuration; private FaunusTitanAccumuloGraph graph; @@ -67,58 +68,42 @@ public void setConf(final Configuration config) { pathEnabled = config.getBoolean(FaunusCompiler.PATH_ENABLED, false); String instance = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_INSTANCE); - String zookeepers = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_HOSTNAME); if (config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT, null) != null) { zookeepers = StringUtils.join(zookeepers.split(","), - ":" + config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT)); + ":" + config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT) + ","); } - try { - AccumuloRowInputFormat.setZooKeeperInstance(config, instance, zookeepers); - } catch (IllegalStateException ex) { - logger.info(ex.getMessage()); - } + // Throws IllegalStateException if instance info already set + AccumuloRowInputFormat.setZooKeeperInstance(config, instance, zookeepers); String username = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_USERNAME); String password = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_PASSWORD); String tableName = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_TABLENAME); + boolean serverSideIterators = config.getBoolean(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_SERVER_ITERATORS, false); - try { - AccumuloRowInputFormat.setInputInfo(config, username, password.getBytes(), tableName, new Authorizations()); - } catch (IllegalStateException ex) { - logger.info(ex.getMessage()); - } + // Throws IllegalStateException if input info already set + AccumuloRowInputFormat.setInputInfo(config, username, password.getBytes(), tableName, new Authorizations()); - // config.set("autotype", "none"); + config.set("storage.read-only", "true"); + config.set("autotype", "none"); Collection> columnFamilyColumnQualifierPairs = Collections.singletonList(new Pair(new Text(Backend.EDGESTORE_NAME), null)); AccumuloRowInputFormat.fetchColumns(config, columnFamilyColumnQualifierPairs); - IteratorSetting is = getColumnFilter(vertexQuery); - if (is != null) { - AccumuloRowInputFormat.addIterator(config, is); - AccumuloRowInputFormat.setLocalIterators(config, true); - } - } - - private IteratorSetting getColumnFilter(VertexQueryFilter inputFilter) { - IteratorSetting is = null; - SliceQuery sliceQuery = TitanInputFormat.inputSlice(vertexQuery, graph); - byte[] minColumn = sliceQuery.getSliceStart().as(StaticBuffer.ARRAY_FACTORY); - byte[] maxColumn = sliceQuery.getSliceEnd().as(StaticBuffer.ARRAY_FACTORY); + IteratorSetting is = AccumuloKeyColumnValueStore.getColumnSliceIterator(sliceQuery); - if (minColumn.length > 0 || maxColumn.length > 0) { - is = new IteratorSetting(20, "colrange", ColumnRangeFilter.class); - ColumnRangeFilter.setRange(is, minColumn, true, maxColumn, false); + if (is != null) { + AccumuloRowInputFormat.addIterator(config, is); + if (!serverSideIterators) { + AccumuloRowInputFormat.setLocalIterators(config, true); + } } - - return is; } @Override diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormat.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormat.java index 6f520f39..ac824395 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormat.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloOutputFormat.java @@ -3,7 +3,7 @@ import com.thinkaurelius.faunus.formats.titan.TitanOutputFormat; /** - * @author Marko A. Rodriguez (http://markorodriguez.com) + * @author Etienne Deprit */ public class TitanAccumuloOutputFormat extends TitanOutputFormat { diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java index 3027d0bd..2356193a 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloRecordReader.java @@ -15,7 +15,7 @@ import org.apache.hadoop.io.Text; /** - * @author Marko A. Rodriguez (http://markorodriguez.com) + * @author Etienne Deprit */ public class TitanAccumuloRecordReader extends RecordReader { @@ -62,17 +62,17 @@ public NullWritable getCurrentKey() throws IOException, InterruptedException { @Override public FaunusVertex getCurrentValue() throws IOException, InterruptedException { - return this.vertex; + return vertex; } @Override public void close() throws IOException { - this.graph.shutdown(); - this.reader.close(); + graph.shutdown(); + reader.close(); } @Override public float getProgress() throws IOException, InterruptedException { - return this.reader.getProgress(); + return reader.getProgress(); } } From f9e9738c50183159e3e28941278fb1fcaf9e6843 Mon Sep 17 00:00:00 2001 From: Etienne Deprit Date: Fri, 27 Sep 2013 10:48:05 -0400 Subject: [PATCH 09/11] Catch exception in TitanAccumuloInputFormat caused by map tasks resetting instance and input information. --- .../titan/accumulo/TitanAccumuloInputFormat.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java index 6d56e315..c9dd1261 100644 --- a/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java +++ b/src/main/java/com/thinkaurelius/faunus/formats/titan/accumulo/TitanAccumuloInputFormat.java @@ -75,8 +75,11 @@ public void setConf(final Configuration config) { ":" + config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_PORT) + ","); } - // Throws IllegalStateException if instance info already set - AccumuloRowInputFormat.setZooKeeperInstance(config, instance, zookeepers); + try { + AccumuloRowInputFormat.setZooKeeperInstance(config, instance, zookeepers); + } catch (IllegalStateException ex) { + // zookeeper instance reset in map task + } String username = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_USERNAME); String password = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_PASSWORD); @@ -84,8 +87,11 @@ public void setConf(final Configuration config) { String tableName = config.get(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_TABLENAME); boolean serverSideIterators = config.getBoolean(FAUNUS_GRAPH_INPUT_TITAN_STORAGE_ACCUMULO_SERVER_ITERATORS, false); - // Throws IllegalStateException if input info already set - AccumuloRowInputFormat.setInputInfo(config, username, password.getBytes(), tableName, new Authorizations()); + try { + AccumuloRowInputFormat.setInputInfo(config, username, password.getBytes(), tableName, new Authorizations()); + } catch (IllegalStateException ex) { + // input info reset in map task + } config.set("storage.read-only", "true"); config.set("autotype", "none"); From b6af84155802b38106f540809de9f4b4c6dbc7ed Mon Sep 17 00:00:00 2001 From: Etienne Deprit Date: Fri, 3 Jan 2014 14:13:57 -0500 Subject: [PATCH 10/11] Replace Titan Lucene dependency with Elastic Search. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 04e86133..e911ed96 100644 --- a/pom.xml +++ b/pom.xml @@ -127,7 +127,7 @@ com.thinkaurelius.titan - titan-lucene + titan-es ${titan.version} From 4295456947fd1dfbfefa45b8552dfeb76d05c5f6 Mon Sep 17 00:00:00 2001 From: Etienne Deprit Date: Fri, 3 Jan 2014 14:19:56 -0500 Subject: [PATCH 11/11] Replace Titan Lucene dependency with Elastic Search. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index baa4076d..a3776004 100644 --- a/pom.xml +++ b/pom.xml @@ -127,7 +127,7 @@ com.thinkaurelius.titan - titan-lucene + titan-es ${titan.version}