@@ -33,6 +33,7 @@ import (
3333 "github.com/docker/docker/client"
3434 "github.com/golang/glog"
3535 "github.com/spf13/pflag"
36+ "golang.org/x/sync/errgroup"
3637 "golang.org/x/tools/go/packages"
3738
3839 "github.com/dgraph-io/dgraph/v25/testutil"
9394 "unit = true unit tests only (no Docker, no integration tag). " +
9495 "integration = everything except ldbc, load, and systest-heavy (with Docker). " +
9596 "systest = systest-baseline + systest-heavy." )
96- tmp = pflag .String ("tmp" , "" , "Temporary directory used to download data." )
97+ tmp = pflag .String ("tmp" , "" , "Temporary directory used to download data." )
98+ keepData = pflag .Bool ("keep-data" , false ,
99+ "If true, do not remove the data directory after tests complete. " +
100+ "Useful in CI where data is cached between runs." )
101+ dataRef = pflag .String ("data-ref" , "" ,
102+ "Git ref (branch, tag, or SHA) for dgraph-benchmarks data. " +
103+ "Overrides DGRAPH_TEST_DATA_REF env var and benchmark-data-version file." )
97104 downloadResources = pflag .BoolP ("download" , "d" , true ,
98105 "Flag to specify whether to download resources or not" )
99106 race = pflag .Bool ("race" , false , "Set true to build with race" )
@@ -1121,18 +1128,17 @@ func isHeavyPackage(pkg string) bool {
11211128 return false
11221129}
11231130
1124- var datafiles = map [string ]string {
1125- "1million-noindex.schema" : "https://raw.githubusercontent.com/dgraph-io/dgraph-benchmarks/refs/heads/main/data/1million-noindex.schema" ,
1126- "1million.schema" : "https://raw.githubusercontent.com/dgraph-io/dgraph-benchmarks/refs/heads/main/data/1million.schema" ,
1127- "1million.rdf.gz" : "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/data/1million.rdf.gz" ,
1128- "21million.schema" : "https://raw.githubusercontent.com/dgraph-io/dgraph-benchmarks/refs/heads/main/data/21million.schema" ,
1129- "21million.rdf.gz" : "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/data/21million.rdf.gz" ,
1131+ // datafilePaths maps filenames to their paths inside the dgraph-benchmarks repo.
1132+ // URLs are constructed at runtime from BenchmarkDataRef().
1133+ var datafilePaths = map [string ]string {
1134+ "1million-noindex.schema" : "data/1million-noindex.schema" ,
1135+ "1million.schema" : "data/1million.schema" ,
1136+ "1million.rdf.gz" : "data/1million.rdf.gz" ,
1137+ "21million.schema" : "data/21million.schema" ,
1138+ "21million.rdf.gz" : "data/21million.rdf.gz" ,
11301139}
11311140
1132- var baseUrl = "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/ldbc/sf0.3/ldbc_rdf_0.3/"
1133- var suffix = "?raw=true"
1134-
1135- var rdfFileNames = [... ]string {
1141+ var ldbcRdfFileNames = [... ]string {
11361142 "Deltas.rdf" ,
11371143 "comment_0.rdf" ,
11381144 "containerOf_0.rdf" ,
@@ -1156,83 +1162,81 @@ var rdfFileNames = [...]string{
11561162 "studyAt_0.rdf" ,
11571163 "tag_0.rdf" ,
11581164 "tagclass_0.rdf" ,
1159- "workAt_0.rdf" }
1160-
1161- var ldbcDataFiles = map [string ]string {
1162- "ldbcTypes.schema" : "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/ldbc/sf0.3/ldbcTypes.schema" ,
1165+ "workAt_0.rdf" ,
11631166}
11641167
1165- func wgetWithRetry (fname , url , dir string ) error {
1166- const maxRetries = 3
1167- fpath := filepath .Join (dir , fname )
1168- for attempt := 1 ; attempt <= maxRetries ; attempt ++ {
1169- cmd := exec .Command ("wget" , "--tries=3" , "--waitretry=5" , "--retry-connrefused" , "-O" , fname , url )
1170- cmd .Dir = dir
1171- if out , err := cmd .CombinedOutput (); err != nil {
1172- fmt .Printf ("attempt %d/%d failed to download %s: %v\n %s\n " , attempt , maxRetries , fname , err , string (out ))
1173- if attempt < maxRetries {
1174- time .Sleep (time .Duration (attempt * 5 ) * time .Second )
1175- continue
1176- }
1177- _ = os .Remove (fpath )
1178- return fmt .Errorf ("failed to download %s after %d attempts: %w" , fname , maxRetries , err )
1179- }
1180- return nil
1181- }
1182- return nil
1168+ // ldbcFilePaths maps filenames to their paths inside the dgraph-benchmarks repo.
1169+ var ldbcFilePaths = map [string ]string {
1170+ "ldbcTypes.schema" : "ldbc/sf0.3/ldbcTypes.schema" ,
11831171}
11841172
1185- func downloadDataFiles () {
1173+ func downloadDataFiles () error {
11861174 if ! * downloadResources {
11871175 fmt .Print ("Skipping downloading of resources\n " )
1188- return
1176+ return nil
11891177 }
1190- for fname , link := range datafiles {
1178+ ref := testutil .BenchmarkDataRef (* dataRef )
1179+ fmt .Printf ("Using benchmark data ref: %s\n " , ref )
1180+ for fname , repoPath := range datafilePaths {
11911181 fpath := filepath .Join (* tmp , fname )
1192- if fi , err := os . Stat (fpath ); err == nil && fi . Size () > 0 {
1182+ if testutil . FileExistsAndValid (fpath ) {
11931183 fmt .Printf ("Skipping %s (already exists)\n " , fname )
11941184 continue
11951185 }
1196- if err := wgetWithRetry (fname , link , * tmp ); err != nil {
1197- panic (fmt .Sprintf ("error downloading %s: %v" , fname , err ))
1186+ var err error
1187+ if strings .HasSuffix (fname , ".rdf.gz" ) {
1188+ err = testutil .DownloadLFSFile (fname , ref , repoPath , * tmp )
1189+ } else {
1190+ err = testutil .DownloadFile (fname , testutil .BenchmarkRawURL (ref , repoPath ), * tmp )
1191+ }
1192+ if err != nil {
1193+ return fmt .Errorf ("error downloading %s: %v" , fname , err )
11981194 }
11991195 }
1196+ return nil
12001197}
12011198
1202- func downloadLDBCFiles (dir string ) {
1199+ func downloadLDBCFiles (dir string ) error {
12031200 if ! * downloadResources {
12041201 fmt .Print ("Skipping downloading of resources\n " )
1205- return
1202+ return nil
12061203 }
12071204
1208- for _ , name := range rdfFileNames {
1209- ldbcDataFiles [name ] = baseUrl + name + suffix
1205+ ref := testutil .BenchmarkDataRef (* dataRef )
1206+ fmt .Printf ("Using benchmark data ref: %s\n " , ref )
1207+
1208+ // All LDBC files (schema + RDF) are LFS-tracked.
1209+ allFiles := make (map [string ]string )
1210+ for fname , repoPath := range ldbcFilePaths {
1211+ allFiles [fname ] = repoPath
1212+ }
1213+ for _ , name := range ldbcRdfFileNames {
1214+ allFiles [name ] = "ldbc/sf0.3/ldbc_rdf_0.3/" + name
12101215 }
12111216
12121217 start := time .Now ()
1213- sem := make ( chan struct {}, 5 )
1214- var wg sync. WaitGroup
1215- for fname , link := range ldbcDataFiles {
1218+ g , _ := errgroup . WithContext ( context . Background () )
1219+ g . SetLimit ( 5 )
1220+ for fname , repoPath := range allFiles {
12161221 fpath := filepath .Join (dir , fname )
1217- if fi , err := os . Stat (fpath ); err == nil && fi . Size () > 0 {
1222+ if testutil . FileExistsAndValid (fpath ) {
12181223 fmt .Printf ("Skipping %s (already exists)\n " , fname )
12191224 continue
12201225 }
1221- wg .Add (1 )
1222- go func (fname , link string ) {
1223- defer wg .Done ()
1224- sem <- struct {}{}
1225- defer func () { <- sem }()
1226-
1226+ g .Go (func () error {
12271227 dlStart := time .Now ()
1228- if err := wgetWithRetry (fname , link , dir ); err != nil {
1229- panic ( fmt .Sprintf ("error downloading %s: %v" , fname , err ) )
1228+ if err := testutil . DownloadLFSFile (fname , ref , repoPath , dir ); err != nil {
1229+ return fmt .Errorf ("error downloading %s: %v" , fname , err )
12301230 }
1231- fmt .Printf ("Downloaded %s to %s in %s \n " , fname , dir , time .Since (dlStart ))
1232- }(fname , link )
1231+ fmt .Printf ("Downloaded %s to %s in %s\n " , fname , dir , time .Since (dlStart ))
1232+ return nil
1233+ })
12331234 }
1234- wg .Wait ()
1235- fmt .Printf ("Downloaded %d files in %s \n " , len (ldbcDataFiles ), time .Since (start ))
1235+ if err := g .Wait (); err != nil {
1236+ return err
1237+ }
1238+ fmt .Printf ("Downloaded %d files in %s\n " , len (allFiles ), time .Since (start ))
1239+ return nil
12361240}
12371241
12381242func createTestCoverageFile (path string ) error {
@@ -1419,15 +1423,21 @@ func run() error {
14191423 x .Check (os .MkdirAll (* tmp , 0755 ))
14201424 }
14211425 if testSuiteContainsAny ("load" , "all" ) {
1422- downloadDataFiles ()
1426+ if err := downloadDataFiles (); err != nil {
1427+ fmt .Printf ("Failed to download data files: %v\n " , err )
1428+ return
1429+ }
14231430 }
14241431 if testSuiteContainsAny ("ldbc" , "all" ) {
14251432 // LDBC files go into a subdirectory because the LDBC test bulk-loads
14261433 // the entire directory (-f <dir>). Mixing load data (1million, 21million)
14271434 // with LDBC data causes schema mismatches.
14281435 ldbcDir := filepath .Join (* tmp , "ldbc" )
14291436 x .Check (os .MkdirAll (ldbcDir , 0755 ))
1430- downloadLDBCFiles (ldbcDir )
1437+ if err := downloadLDBCFiles (ldbcDir ); err != nil {
1438+ fmt .Printf ("Failed to download LDBC files: %v\n " , err )
1439+ return
1440+ }
14311441 }
14321442 for i , task := range valid {
14331443 select {
@@ -1478,6 +1488,9 @@ func main() {
14781488 procId = rand .Intn (1000 )
14791489
14801490 err := run ()
1491+ if ! * keepData && * tmp != "" {
1492+ _ = os .RemoveAll (* tmp )
1493+ }
14811494 if err != nil {
14821495 os .Exit (1 )
14831496 }
0 commit comments