Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 121 additions & 28 deletions src/jrd/replication/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ namespace
constexpr ULONG DEFAULT_GROUP_FLUSH_DELAY = 0;
constexpr ULONG DEFAULT_APPLY_IDLE_TIMEOUT = 10; // seconds
constexpr ULONG DEFAULT_APPLY_ERROR_TIMEOUT = 60; // seconds
constexpr bool DEFAULT_REPORT_ERRORS = false;

void parseLong(const string& input, ULONG& output)
{
Expand All @@ -79,36 +80,53 @@ namespace
output = false;
}

void configError(const string& type, const string& key, const string& value)
void configError(CheckStatusWrapper* status, const string& type, const string& key, const string& value)
{
raiseError("%s specifies %s: %s", key.c_str(), type.c_str(), value.c_str());
string msg;
if (!(status->getState() & IStatus::STATE_ERRORS))
{
msg.printf("Incorrect entry in %s", REPLICATION_CFGFILE);
(Arg::Gds(isc_random) << Arg::Str(msg)).appendTo(status);
}

msg.printf("%s specifies %s: %s", key.c_str(), type.c_str(), value.c_str());
(Arg::Gds(isc_random) << Arg::Str(msg)).appendTo(status);
}

void checkAccess(const PathName& path, const string& key)
bool checkAccess(CheckStatusWrapper* status, const PathName& path, const string& key)
{
if (path.hasData() && !PathUtils::canAccess(path, 6))
configError("missing or inaccessible directory", key, path.c_str());
{
configError(status, "missing or inaccessible directory", key, path.c_str());
return false;
}
return true;
}

void composeError(CheckStatusWrapper* status, const Exception& ex)
{
string prefix;
prefix.printf("Incorrect entry in %s", REPLICATION_CFGFILE);

Arg::StatusVector sv;
sv << Arg::Gds(isc_random) << Arg::Str(prefix);

if (!(status->getState() & IStatus::STATE_ERRORS))
{
string prefix;
prefix.printf("Incorrect entry in %s", REPLICATION_CFGFILE);
sv << Arg::Gds(isc_random) << Arg::Str(prefix);
}

sv << Arg::StatusVector(status);
sv << Arg::StatusVector(ex);

status->setErrors(sv.value());
}

void parseExternalValue(const string& key, const string& value, string& output)
bool parseExternalValue(CheckStatusWrapper* status, const string& key, const string& value, string& output)
{
const auto pos = key.rfind('_');
if (pos == string::npos)
{
output = value.c_str();
return;
return true;
}

string temp;
Expand All @@ -118,7 +136,10 @@ namespace
{
fb_utils::readenv(value.c_str(), temp);
if (temp.isEmpty())
configError("missing environment variable", key, value);
{
configError(status, "missing environment variable", key, value);
return false;
}
}
else if (key_source.equals(KEY_SUFFIX_FILE))
{
Expand All @@ -129,44 +150,65 @@ namespace

AutoPtr<FILE> file(os_utils::fopen(filename.c_str(), "rt"));
if (!file)
configError("missing or inaccessible file", key, filename.c_str());
{
configError(status, "missing or inaccessible file", key, filename.c_str());
return false;
}

if (temp.LoadFromFile(file))
temp.alltrim("\r");

if (temp.isEmpty())
configError("first empty line of file", key, filename.c_str());
{
configError(status, "first empty line of file", key, filename.c_str());
return false;
}
}

output = temp.c_str();
return true;
}

void parseSyncReplica(const ConfigFile::Parameters& params, SyncReplica& output)
bool parseSyncReplica(CheckStatusWrapper* status, const ConfigFile::Parameters& params, SyncReplica& output)
{
bool result = true;
for (const auto& el : params)
{
const string key(el.name.c_str());
const string value(el.value);

if (value.isEmpty())
continue;
{
configError(status, "empty value", output.database, key);
result = false;
}

if (key.find("username") == 0)
{
if (output.username.hasData())
configError("multiple values", output.database, "username");
parseExternalValue(key, value, output.username);
{
configError(status, "multiple values", output.database, "username");
result = false;
}
result &= parseExternalValue(status, key, value, output.username);
output.username.rtrim(" ");
}
else if (key.find("password") == 0)
{
if (output.password.hasData())
configError("multiple values", output.database, "password");
parseExternalValue(key, value, output.password);
{
configError(status, "multiple values", output.database, "password");
result = false;
}
result &= parseExternalValue(status, key, value, output.password);
}
else
configError("unknown parameter", output.database, key);
{
configError(status, "unknown key", output.database, key);
result = false;
}
}
return result;
}
}

Expand Down Expand Up @@ -196,7 +238,7 @@ Config::Config()
schemaSearchPath(getPool()),
pluginName(getPool()),
logErrors(true),
reportErrors(false),
reportErrors(DEFAULT_REPORT_ERRORS),
disableOnError(true),
cascadeReplication(false)
{
Expand Down Expand Up @@ -238,6 +280,7 @@ Config* Config::get(const PathName& lookupName)
{
fb_assert(lookupName.hasData());

bool reportErrors = DEFAULT_REPORT_ERRORS;
try
{
const PathName filename =
Expand All @@ -249,7 +292,8 @@ Config* Config::get(const PathName& lookupName)

AutoPtr<Config> config(FB_NEW Config);

bool defaultFound = false, exactMatch = false;
FbLocalStatus localStatus;
bool defaultFound = false, exactMatch = false, replicaSkip = false;

for (const auto& section : cfgFile.getParameters())
{
Expand Down Expand Up @@ -287,15 +331,24 @@ Config* Config::get(const PathName& lookupName)
string value(el.value);

if (value.isEmpty())
{
configError(&localStatus, "empty value of key",
exactMatch ? lookupName.c_str() : section.name.c_str(),
key);
continue;
}

if (key == "sync_replica")
{
SyncReplica syncReplica(config->getPool());
if (el.sub)
{
syncReplica.database = value;
parseSyncReplica(el.sub->getParameters(), syncReplica);
if (!parseSyncReplica(&localStatus, el.sub->getParameters(), syncReplica))
{
replicaSkip = true;
continue;
}
}
else
splitConnectionString(value, syncReplica.database, syncReplica.username, syncReplica.password);
Expand Down Expand Up @@ -338,7 +391,12 @@ Config* Config::get(const PathName& lookupName)
{
config->journalDirectory = value.c_str();
PathUtils::ensureSeparator(config->journalDirectory);
checkAccess(config->journalDirectory, key);
if (!checkAccess(&localStatus, config->journalDirectory, key))
{
config->journalDirectory.erase();
replicaSkip = true;
continue;
}
}
else if (key == "journal_file_prefix")
{
Expand All @@ -352,7 +410,11 @@ Config* Config::get(const PathName& lookupName)
{
config->archiveDirectory = value.c_str();
PathUtils::ensureSeparator(config->archiveDirectory);
checkAccess(config->archiveDirectory, key);
if (!checkAccess(&localStatus, config->archiveDirectory, key))
{
config->archiveDirectory.erase();
continue;
}
}
else if (key == "journal_archive_command")
{
Expand All @@ -373,6 +435,7 @@ Config* Config::get(const PathName& lookupName)
else if (key == "report_errors")
{
parseBoolean(value, config->reportErrors);
reportErrors = config->reportErrors;
}
else if (key == "disable_on_error")
{
Expand All @@ -382,12 +445,28 @@ Config* Config::get(const PathName& lookupName)
{
parseBoolean(value, config->cascadeReplication);
}
else if ((key != "journal_source_directory") &&
(key != "source_guid") &&
(key != "verbose_logging") &&
(key != "apply_idle_timeout") &&
(key != "apply_error_timeout"))
{
configError(&localStatus, "unknown key",
exactMatch ? lookupName.c_str() : section.name.c_str(),
key);
}
}

if (exactMatch)
break;
}

if (localStatus->getState() & IStatus::STATE_ERRORS)
logPrimaryStatus(lookupName, &localStatus);

if (replicaSkip && !config->disableOnError)
raiseError("One or more replicas configured with errors");

// TODO: As soon as plugin name is moved into RDB$PUBLICATIONS,
// delay config parse until real replication start
if (config->pluginName.hasData())
Expand All @@ -410,13 +489,18 @@ Config* Config::get(const PathName& lookupName)

return config.release();
}

if (replicaSkip)
raiseError("All configured replicas skipped");
}
catch (const Exception& ex)
{
FbLocalStatus localStatus;
composeError(&localStatus, ex);

logPrimaryStatus(lookupName, &localStatus);
if (reportErrors)
localStatus.raise();
}

return nullptr;
Expand All @@ -428,6 +512,7 @@ Config* Config::get(const PathName& lookupName)
void Config::enumerate(ReplicaList& replicas)
{
PathName dbName;
FbLocalStatus localStatus;

try
{
Expand Down Expand Up @@ -480,12 +565,20 @@ void Config::enumerate(ReplicaList& replicas)
{
config->sourceDirectory = value.c_str();
PathUtils::ensureSeparator(config->sourceDirectory);
if (!checkAccess(&localStatus, config->sourceDirectory, key))
{
config->sourceDirectory.erase();
continue;
}
}
else if (key == "source_guid")
{
config->sourceGuid = Guid::fromString(value);
if (!config->sourceGuid)
configError("invalid (misformatted) value", key, value);
{
configError(&localStatus, "invalid (misformatted) value", key, value);
continue;
}
}
else if (key == "verbose_logging")
{
Expand Down Expand Up @@ -517,11 +610,11 @@ void Config::enumerate(ReplicaList& replicas)
}
catch (const Exception& ex)
{
FbLocalStatus localStatus;
composeError(&localStatus, ex);
}

if (localStatus->getState() & IStatus::STATE_ERRORS)
logReplicaStatus(dbName, &localStatus);
}
}

// This routine is used for split input connection string to parts
Expand Down
4 changes: 4 additions & 0 deletions src/jrd/replication/Manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ Manager::Manager(const string& dbId,
if (localStatus->getState() & IStatus::STATE_ERRORS)
{
logPrimaryStatus(m_config->dbName, &localStatus);
if (m_config->reportErrors)
(Arg::Gds(isc_repl_error) << Arg::StatusVector(&localStatus)).raise();
continue;
}

Expand All @@ -192,6 +194,8 @@ Manager::Manager(const string& dbId,
{
logPrimaryStatus(m_config->dbName, &localStatus);
attachment->detach(&localStatus);
if (m_config->reportErrors)
(Arg::Gds(isc_repl_error) << Arg::StatusVector(&localStatus)).raise();
continue;
}

Expand Down
Loading