Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ scripts/demo/postgresql/
docker-compose.dev.yml
*.bak

# Test output files
test_output.txt

#PDF cover page-related files
header-icons/*
combined-pdfs/*
Expand Down
17 changes: 17 additions & 0 deletions modules/invenio-files-rest/invenio_files_rest/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,23 @@ class LocationModelView(ModelView):
'System Administrator')
_repoadmin_role = os.environ.get('INVENIO_ROLE_REPOSITORY',
'Repository Administrator')

def get_query(self):
"""Override get_query to filter locations based on user roles."""
query = super(LocationModelView, self).get_query()
user_role_names = {role.name for role in current_user.roles}
if not self._system_role in user_role_names:
# Non-system admins should not see default locations.
query = query.filter_by(default=False)
return query

def get_count_query(self):
"""Override get_count_query to apply the same role-based filters."""
query = super(LocationModelView, self).get_count_query()
user_role_names = {role.name for role in current_user.roles}
if not self._system_role in user_role_names:
query = query.filter_by(default=False)
return query
@expose('/')
def index_view(self):
"""Override index view to add custom logic.
Expand Down
137 changes: 137 additions & 0 deletions modules/invenio-files-rest/tests/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,140 @@ def test_can_delete(self, app, db, monkeypatch):
with patch('invenio_files_rest.admin.current_user', mock_user):
view = LocationModelView(Location, db.session)
assert not view.can_delete


def test_get_query(self, app, db, monkeypatch):
"""Test get_query filters locations based on user roles."""
monkeypatch.setenv('INVENIO_ROLE_SYSTEM', 'System Administrator')
monkeypatch.setenv('INVENIO_ROLE_REPOSITORY', 'Repository Administrator')

# Create test locations
default_loc = Location(name='default-loc', uri='/tmp/default', default=True)
non_default_loc = Location(name='non-default-loc', uri='/tmp/non-default', default=False)
db.session.add(default_loc)
db.session.add(non_default_loc)
db.session.commit()

try:
mock_user = MagicMock()

# Test Case: System Administrator sees all locations (including default)
mock_role_sysad = MagicMock()
mock_role_sysad.name = 'System Administrator'
mock_user.roles = [mock_role_sysad]
with patch('invenio_files_rest.admin.current_user', mock_user):
view = LocationModelView(Location, db.session)
query = view.get_query()
locations = query.all()
location_names = {loc.name for loc in locations}
assert 'default-loc' in location_names
assert 'non-default-loc' in location_names

# Test Case: Repository Administrator does not see default locations
mock_role_repoad = MagicMock()
mock_role_repoad.name = 'Repository Administrator'
mock_user.roles = [mock_role_repoad]
with patch('invenio_files_rest.admin.current_user', mock_user):
view = LocationModelView(Location, db.session)
query = view.get_query()
locations = query.all()
location_names = {loc.name for loc in locations}
assert 'default-loc' not in location_names
assert 'non-default-loc' in location_names

# Test Case: Community Administrator does not see default locations
mock_role_commad = MagicMock()
mock_role_commad.name = 'Community Administrator'
mock_user.roles = [mock_role_commad]
with patch('invenio_files_rest.admin.current_user', mock_user):
view = LocationModelView(Location, db.session)
query = view.get_query()
locations = query.all()
location_names = {loc.name for loc in locations}
assert 'default-loc' not in location_names
assert 'non-default-loc' in location_names

# Test Case: User without role does not see default locations
mock_user.roles = []
with patch('invenio_files_rest.admin.current_user', mock_user):
view = LocationModelView(Location, db.session)
query = view.get_query()
locations = query.all()
location_names = {loc.name for loc in locations}
assert 'default-loc' not in location_names
assert 'non-default-loc' in location_names
finally:
# Clean up test locations
db.session.delete(default_loc)
db.session.delete(non_default_loc)
db.session.commit()


def test_get_count_query(self, app, db, monkeypatch):
"""Test get_count_query filters locations based on user roles."""
monkeypatch.setenv('INVENIO_ROLE_SYSTEM', 'System Administrator')
monkeypatch.setenv('INVENIO_ROLE_REPOSITORY', 'Repository Administrator')

# Create test locations
default_loc = Location(name='default-loc-count', uri='/tmp/default-count', default=True)
non_default_loc1 = Location(name='non-default-loc-1', uri='/tmp/non-default-1', default=False)
non_default_loc2 = Location(name='non-default-loc-2', uri='/tmp/non-default-2', default=False)
db.session.add(default_loc)
db.session.add(non_default_loc1)
db.session.add(non_default_loc2)
db.session.commit()

try:
mock_user = MagicMock()

# Test Case: System Administrator sees count of all locations (including default)
mock_role_sysad = MagicMock()
mock_role_sysad.name = 'System Administrator'
mock_user.roles = [mock_role_sysad]
with patch('invenio_files_rest.admin.current_user', mock_user):
view = LocationModelView(Location, db.session)
query = view.get_count_query()
count = query.count()
# Should see all locations in the database
total_locations = db.session.query(Location).count()
assert count == total_locations

# Test Case: Repository Administrator sees count excluding default locations
mock_role_repoad = MagicMock()
mock_role_repoad.name = 'Repository Administrator'
mock_user.roles = [mock_role_repoad]
with patch('invenio_files_rest.admin.current_user', mock_user):
view = LocationModelView(Location, db.session)
query = view.get_count_query()
count = query.count()
# Should only see non-default locations
non_default_count = db.session.query(Location).filter_by(default=False).count()
assert count == non_default_count

# Test Case: Community Administrator sees count excluding default locations
mock_role_commad = MagicMock()
mock_role_commad.name = 'Community Administrator'
mock_user.roles = [mock_role_commad]
with patch('invenio_files_rest.admin.current_user', mock_user):
view = LocationModelView(Location, db.session)
query = view.get_count_query()
count = query.count()
# Should only see non-default locations
non_default_count = db.session.query(Location).filter_by(default=False).count()
assert count == non_default_count

# Test Case: User without role sees count excluding default locations
mock_user.roles = []
with patch('invenio_files_rest.admin.current_user', mock_user):
view = LocationModelView(Location, db.session)
query = view.get_count_query()
count = query.count()
# Should only see non-default locations
non_default_count = db.session.query(Location).filter_by(default=False).count()
assert count == non_default_count
finally:
# Clean up test locations
db.session.delete(default_loc)
db.session.delete(non_default_loc1)
db.session.delete(non_default_loc2)
db.session.commit()
3 changes: 3 additions & 0 deletions modules/weko-records-ui/weko_records_ui/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,3 +827,6 @@ class FILE_OPEN_STATUS(Enum):

WEKO_RECORDS_UI_GAKUNIN_RDM_URL = "https://rdm.nii.ac.jp"
"""URL of GakuNin RDM."""

WEKO_RECORDS_UI_USER_STORAGE_MODIFICATION_ENABLED = False
"""Enable user storage modification feature."""
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,15 @@ <h2 class="col-sm-12 col-md-12 col-left" style="margin-top: unset;">{{ filename.
{%- endif -%}

<input type="hidden" value="{{file.bucket}}" id="bucket_id_value"/>
{% if record | check_permission %}
{% if can_edit %}
<input type="file" id="fileInput" style="display: none;" />
<span style="white-space: nowrap;">
<button onclick="replaceFile()">{{_('Replace the file content')}}</button>
<button onclick="openBucketCopyModal()">{{_('Copy file to open bucket')}}</button>
</span>
{% if is_storage_editable %}
{% if record | check_permission %}
{% if can_edit %}
<input type="file" id="fileInput" style="display: none;" />
<span style="white-space: nowrap;">
<button onclick="replaceFile()">{{_('Replace the file content')}}</button>
<button onclick="openBucketCopyModal()">{{_('Copy file to open bucket')}}</button>
</span>
{% endif %}
{% endif %}
{% endif %}
</td>
Expand Down
2 changes: 2 additions & 0 deletions modules/weko-records-ui/weko_records_ui/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,7 @@ def _get_rights_title(result, rights_key_str, rights_values, current_lang, meta_
is_no_content_item_application = item_application_settings.get("item_application_enable", False) \
and int(item_type_id) in item_application_settings.get("application_item_types", [])


return render_template(
template,
pid=pid,
Expand Down Expand Up @@ -837,6 +838,7 @@ def _get_rights_title(result, rights_key_str, rights_values, current_lang, meta_
restricted_errorMsg = restricted_errorMsg,
with_files = with_files,
belonging_community=belonging_community,
is_storage_editable=current_app.config.get('WEKO_RECORDS_UI_USER_STORAGE_MODIFICATION_ENABLED'),
**ctx,
**kwargs
)
Expand Down
34 changes: 34 additions & 0 deletions modules/weko-user-profiles/tests/test_forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,40 @@ def test_validate_username(self,app,client,register_form,users,db):
res = client.post("/test_form/profile_form",data=data)
assert res.data == bytes("invalid","utf-8")

def test_init_storage_fields_removed_when_disabled(self, app):
"""Test that storage fields are removed when feature flag is disabled."""
# Set the config to disable storage modification
app.config.update(
WEKO_RECORDS_UI_USER_STORAGE_MODIFICATION_ENABLED=False
)

with app.app_context():
# Create the form
form = ProfileForm()

# Verify storage-related fields are removed
assert not hasattr(form, 'access_key')
assert not hasattr(form, 'secret_key')
assert not hasattr(form, 's3_endpoint_url')
assert not hasattr(form, 's3_region_name')

def test_init_storage_fields_present_when_enabled(self, app):
"""Test that storage fields are present when feature flag is enabled."""
# Set the config to enable storage modification
app.config.update(
WEKO_RECORDS_UI_USER_STORAGE_MODIFICATION_ENABLED=True
)

with app.app_context():
# Create the form
form = ProfileForm()

# Verify storage-related fields are present
assert hasattr(form, 'access_key')
assert hasattr(form, 'secret_key')
assert hasattr(form, 's3_endpoint_url')
assert hasattr(form, 's3_region_name')

# def custom_profile_form_factory(profile_cls):
# .tox/c1/bin/pytest --cov=weko_user_profiles tests/test_forms.py::test_custom_profile_form_factory -vv -s --cov-branch --cov-report=term --cov-report=html --basetemp=/code/modules/weko-user-profiles/.tox/c1/tmp
class DummyClass:
Expand Down
14 changes: 14 additions & 0 deletions modules/weko-user-profiles/weko_user_profiles/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,20 @@ class ProfileForm(FlaskForm):
choices=WEKO_USERPROFILES_INSTITUTE_POSITION_LIST
)

def __init__(self, *args, **kwargs):
super(ProfileForm, self).__init__(*args, **kwargs)
if not current_app.config.get("WEKO_RECORDS_UI_USER_STORAGE_MODIFICATION_ENABLED", False):
if hasattr(self, 'access_key'):
del self.access_key

if hasattr(self, 'secret_key'):
del self.secret_key

if hasattr(self, 's3_endpoint_url'):
del self.s3_endpoint_url

if hasattr(self, 's3_region_name'):
del self.s3_region_name

def validate_username(form, field):
"""Wrap username validator for WTForms."""
Expand Down
Loading