2626import com .datastax .driver .mapping .Mapper ;
2727import com .datastax .driver .mapping .MappingManager ;
2828import com .datastax .driver .mapping .Result ;
29-
3029import com .google .common .collect .TreeTraverser ;
31- import org .commonjava .storage .pathmapped .model .*;
32- import org .commonjava .storage .pathmapped .pathdb .datastax .model .*;
30+ import org .commonjava .storage .pathmapped .config .PathMappedStorageConfig ;
31+ import org .commonjava .storage .pathmapped .model .FileChecksum ;
32+ import org .commonjava .storage .pathmapped .model .Filesystem ;
33+ import org .commonjava .storage .pathmapped .model .PathMap ;
34+ import org .commonjava .storage .pathmapped .model .Reclaim ;
35+ import org .commonjava .storage .pathmapped .model .ReverseMap ;
36+ import org .commonjava .storage .pathmapped .pathdb .datastax .model .DtxFileChecksum ;
37+ import org .commonjava .storage .pathmapped .pathdb .datastax .model .DtxFilesystem ;
38+ import org .commonjava .storage .pathmapped .pathdb .datastax .model .DtxPathMap ;
39+ import org .commonjava .storage .pathmapped .pathdb .datastax .model .DtxProxySite ;
40+ import org .commonjava .storage .pathmapped .pathdb .datastax .model .DtxReclaim ;
41+ import org .commonjava .storage .pathmapped .pathdb .datastax .model .DtxReverseMap ;
3342import org .commonjava .storage .pathmapped .pathdb .datastax .util .AsyncJobExecutor ;
3443import org .commonjava .storage .pathmapped .pathdb .datastax .util .CassandraPathDBUtils ;
35- import org .commonjava .storage .pathmapped .config .PathMappedStorageConfig ;
3644import org .commonjava .storage .pathmapped .spi .PathDB ;
3745import org .commonjava .storage .pathmapped .spi .PathDBAdmin ;
3846import org .commonjava .storage .pathmapped .util .PathMapUtils ;
5563import java .util .function .Function ;
5664import java .util .stream .Collectors ;
5765
58- import static com .datastax .driver .core .ConsistencyLevel .* ;
66+ import static com .datastax .driver .core .ConsistencyLevel .QUORUM ;
5967import static java .util .Collections .emptySet ;
6068import static org .apache .commons .lang .StringUtils .isNotBlank ;
6169import static org .commonjava .storage .pathmapped .pathdb .datastax .util .CassandraPathDBUtils .getHoursInDay ;
62- import static org .commonjava .storage .pathmapped .spi .PathDB .FileType .*;
70+ import static org .commonjava .storage .pathmapped .spi .PathDB .FileType .all ;
71+ import static org .commonjava .storage .pathmapped .spi .PathDB .FileType .dir ;
72+ import static org .commonjava .storage .pathmapped .spi .PathDB .FileType .file ;
6373import static org .commonjava .storage .pathmapped .util .PathMapUtils .ROOT_DIR ;
6474
6575public class CassandraPathDB
@@ -83,6 +93,8 @@ public class CassandraPathDB
8393
8494 private Mapper <DtxFilesystem > filesystemMapper ;
8595
96+ protected Set <String > proxySitesCache = new HashSet <>();
97+
8698 private PathMappedStorageConfig config ;
8799
88100 private String keyspace ;
@@ -91,9 +103,12 @@ public class CassandraPathDB
91103
92104 private long reconnectDelay = 60000 ;
93105
94- private PreparedStatement preparedExistQuery , preparedListQuery , preparedListCheckEmpty , preparedContainingQuery , preparedExistFileQuery ,
95- preparedUpdateExpiration , preparedReverseMapIncrement , preparedReverseMapReduction ,
96- preparedFilesystemIncrement , preparedFilesystemReduction , preparedFilesystemList ;
106+ private Mapper <DtxProxySite > proxysiteMapper ;
107+
108+ private PreparedStatement preparedExistQuery , preparedListQuery , preparedListCheckEmpty , preparedContainingQuery ,
109+ preparedExistFileQuery , preparedUpdateExpiration , preparedReverseMapIncrement , preparedReverseMapReduction ,
110+ preparedFilesystemIncrement , preparedFilesystemReduction , preparedFilesystemList , preparedProxySiteQuery ,
111+ preparedProxySiteList , preparedProxySiteTruncate ;
97112
98113 @ Deprecated
99114 public CassandraPathDB ( PathMappedStorageConfig config , Session session , String keyspace )
@@ -158,6 +173,7 @@ private void prepare( Session session, String keyspace, int replicationFactor )
158173 session .execute ( CassandraPathDBUtils .getSchemaCreateTableReclaim ( keyspace ) );
159174 session .execute ( CassandraPathDBUtils .getSchemaCreateTableFileChecksum ( keyspace ) );
160175 session .execute ( CassandraPathDBUtils .getSchemaCreateTableFilesystem ( keyspace ) );
176+ session .execute ( CassandraPathDBUtils .getSchemaCreateTableProxySites ( keyspace ) );
161177
162178 MappingManager manager = new MappingManager ( session );
163179
@@ -166,6 +182,7 @@ private void prepare( Session session, String keyspace, int replicationFactor )
166182 reclaimMapper = manager .mapper ( DtxReclaim .class , keyspace );
167183 fileChecksumMapper = manager .mapper ( DtxFileChecksum .class , keyspace );
168184 filesystemMapper = manager .mapper ( DtxFilesystem .class , keyspace );
185+ proxysiteMapper = manager .mapper ( DtxProxySite .class , keyspace );
169186
170187 preparedExistFileQuery = session .prepare ( "SELECT count(*) FROM " + keyspace
171188 + ".pathmap WHERE filesystem=? and parentpath=? and filename=?;" );
@@ -185,22 +202,28 @@ private void prepare( Session session, String keyspace, int replicationFactor )
185202 "WHERE filesystem=? and parentpath=? and filename=?;" );
186203
187204 preparedContainingQuery = session .prepare ( "SELECT filesystem FROM " + keyspace
188- + ".pathmap WHERE filesystem IN ? and parentpath=? and filename=?;" );
205+ + ".pathmap WHERE filesystem IN ? and parentpath=? and filename=?;" );
189206
190207 preparedReverseMapIncrement =
191- session .prepare ( "UPDATE " + keyspace + ".reversemap SET paths = paths + ? WHERE fileid=?;" );
208+ session .prepare ( "UPDATE " + keyspace + ".reversemap SET paths = paths + ? WHERE fileid=?;" );
192209
193210 preparedReverseMapReduction =
194- session .prepare ( "UPDATE " + keyspace + ".reversemap SET paths = paths - ? WHERE fileid=?;" );
211+ session .prepare ( "UPDATE " + keyspace + ".reversemap SET paths = paths - ? WHERE fileid=?;" );
195212 preparedReverseMapReduction .setConsistencyLevel ( QUORUM );
196213
197- preparedFilesystemIncrement =
198- session . prepare ( "UPDATE " + keyspace + ".filesystem SET filecount=filecount+?, size=size+? WHERE filesystem=?;" );
214+ preparedFilesystemIncrement = session . prepare (
215+ "UPDATE " + keyspace + ".filesystem SET filecount=filecount+?, size=size+? WHERE filesystem=?;" );
199216
200- preparedFilesystemReduction =
201- session . prepare ( "UPDATE " + keyspace + ".filesystem SET filecount=filecount-?, size=size-? WHERE filesystem=?;" );
217+ preparedFilesystemReduction = session . prepare (
218+ "UPDATE " + keyspace + ".filesystem SET filecount=filecount-?, size=size-? WHERE filesystem=?;" );
202219
203- preparedFilesystemList = session .prepare ("SELECT * FROM " + keyspace + ".filesystem;" );
220+ preparedFilesystemList = session .prepare ( "SELECT * FROM " + keyspace + ".filesystem;" );
221+
222+ preparedProxySiteQuery = session .prepare ( "SELECT * FROM " + keyspace + ".proxysites WHERE site=?;" );
223+
224+ preparedProxySiteList = session .prepare ( "SELECT * FROM " + keyspace + ".proxysites;" );
225+
226+ preparedProxySiteTruncate = session .prepare ( "TRUNCATE " + keyspace + ".proxysites;" );
204227
205228 asyncJobExecutor = new AsyncJobExecutor ( config );
206229 }
@@ -997,4 +1020,48 @@ private ResultSet executeSession ( BoundStatement bind )
9971020 }
9981021 return trackingRecord ;
9991022 }
1023+
1024+ @ Override
1025+ public Set <String > getProxySitesCache ()
1026+ {
1027+ return proxySitesCache ;
1028+ }
1029+
1030+ @ Override
1031+ public boolean isProxySite ( String site )
1032+ {
1033+ BoundStatement bound = preparedProxySiteQuery .bind ( site );
1034+ ResultSet result = executeSession ( bound );
1035+ return notNull ( result );
1036+ }
1037+
1038+ @ Override
1039+ public List <String > getProxySiteList ()
1040+ {
1041+ ResultSet result = executeSession ( preparedProxySiteList .bind () );
1042+ return proxysiteMapper .map ( result ).all ().stream ().map ( DtxProxySite ::getSite ).collect ( Collectors .toList () );
1043+ }
1044+
1045+ @ Override
1046+ public void saveProxySite ( String site )
1047+ {
1048+ DtxProxySite proxySite = new DtxProxySite ( site );
1049+ logger .debug ( "ProxySite, {}" , site );
1050+ proxysiteMapper .save ( proxySite );
1051+ }
1052+
1053+ @ Override
1054+ public void deleteProxySite ( String site )
1055+ {
1056+ DtxProxySite proxySite = new DtxProxySite ( site );
1057+ logger .debug ( "Delete proxySite, {}" , site );
1058+ proxysiteMapper .delete ( proxySite );
1059+ }
1060+
1061+ @ Override
1062+ public void deleteAllProxySite ()
1063+ {
1064+ BoundStatement bound = preparedProxySiteTruncate .bind ();
1065+ executeSession ( bound );
1066+ }
10001067}
0 commit comments