From 8ce429158a8770d61de29dd2ccc779c9c02a8d77 Mon Sep 17 00:00:00 2001 From: May Lee Date: Fri, 13 Feb 2026 14:19:51 -0500 Subject: [PATCH 1/2] fixes --- .../install_the_worker/advanced_worker_configurations.md | 2 +- .../observability_pipelines/configuration/set_up_pipelines.md | 1 - .../monitoring_and_troubleshooting/troubleshooting.md | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/content/en/observability_pipelines/configuration/install_the_worker/advanced_worker_configurations.md b/content/en/observability_pipelines/configuration/install_the_worker/advanced_worker_configurations.md index 30eee800320..3981f507aaf 100644 --- a/content/en/observability_pipelines/configuration/install_the_worker/advanced_worker_configurations.md +++ b/content/en/observability_pipelines/configuration/install_the_worker/advanced_worker_configurations.md @@ -44,7 +44,7 @@ The following is a list of bootstrap options, their related pipeline environment :     `api`:
        `enabled`: `true`
        `address`: `"127.0.0.1:8686" # optional` : Note: Setting `address` is optional. It is the network address to which the API should bind. If you're running the Worker in a Docker container, bind to `0.0.0.0`. Otherwise, the API is not exposed outside of the container. : **Description**: Enable the Observability Pipelines Worker API so you can see the Worker's processes with the `tap` or `top` command. See [Run, tap, or top the Worker][8] for more information. If you are using the Helm charts provided when you [set up a pipeline][7], then the API has already been enabled. Otherwise, make sure the environment variable `DD_OP_API_ENABLED` is set to `true` in `/etc/observability-pipelines-worker/bootstrap.yaml`. This sets up the API to listen on `localhost` and port `8686`, which is what the CLI for `tap` is expecting. -

See [Enable liveness and readiness probe](#enable-liveness-and-readiness-probe) on how to expose the `/health` endpoint. +

See [Enable liveness and readiness probe](#enable-the-health-check-endpoint-and-the-liveness-and-readiness-probes) on how to expose the `/health` endpoint. `api_key` : **Pipeline environment variable**: `DD_API_KEY` diff --git a/content/en/observability_pipelines/configuration/set_up_pipelines.md b/content/en/observability_pipelines/configuration/set_up_pipelines.md index dc3d8bad9fe..6f20af3ca7e 100644 --- a/content/en/observability_pipelines/configuration/set_up_pipelines.md +++ b/content/en/observability_pipelines/configuration/set_up_pipelines.md @@ -164,7 +164,6 @@ To delete a pipeline in the UI: - A pipeline must have at least one destination. If a processor group only has one destination, that destination cannot be deleted. - For log pipelines: - You can add a total of three destinations for a log pipeline. - - A specific destination can only be added once. For example, you cannot add multiple Splunk HEC destinations. ## Further Reading diff --git a/content/en/observability_pipelines/monitoring_and_troubleshooting/troubleshooting.md b/content/en/observability_pipelines/monitoring_and_troubleshooting/troubleshooting.md index 7758f7ea899..f1c7389a81e 100644 --- a/content/en/observability_pipelines/monitoring_and_troubleshooting/troubleshooting.md +++ b/content/en/observability_pipelines/monitoring_and_troubleshooting/troubleshooting.md @@ -183,7 +183,7 @@ The quota processor is synchronized across all Workers in a Datadog organization [12]: https://app.datadoghq.com/logs [13]: /observability_pipelines/configuration/install_the_worker/worker_commands/ [14]: https://docs.redhat.com/en/documentation/red_hat_enterprise_linux/7/html/security_guide/sec-port_forwarding#sec-Adding_a_Port_to_Redirect -[15]: /observability_pipelines/configuration/install_the_worker/advanced_worker_configurations/#enable-liveness-and-readiness-probe +[15]: /observability_pipelines/configuration/install_the_worker/advanced_worker_configurations/#enable-the-health-check-endpoint-and-the-liveness-and-readiness-probes [16]: /observability_pipelines/sources/#tls-certificates [17]: https://app.datadoghq.com/organization-settings/remote-config/setup [18]: /observability_pipelines/guide/environment_variables/ From 9d01ec25e58a6d34bbf25479e99492b8aba929f5 Mon Sep 17 00:00:00 2001 From: May Lee Date: Fri, 13 Feb 2026 17:01:54 -0500 Subject: [PATCH 2/2] Move processor shortcode content inline for batch 2 Move shortcode content into processor docs and restructure into Overview and Setup sections for: - parse_json - parse_xml - quota - reduce - remap_ocsf --- .../processors/parse_json.md | 50 ++++++- .../processors/parse_xml.md | 122 +++++++++++++++++- .../processors/quota.md | 79 +++++++++++- .../processors/reduce.md | 35 ++++- .../processors/remap_ocsf.md | 65 +++++++++- 5 files changed, 344 insertions(+), 7 deletions(-) diff --git a/content/en/observability_pipelines/processors/parse_json.md b/content/en/observability_pipelines/processors/parse_json.md index 905ef2bdd15..1a5d492583a 100644 --- a/content/en/observability_pipelines/processors/parse_json.md +++ b/content/en/observability_pipelines/processors/parse_json.md @@ -15,7 +15,55 @@ further_reading: {{< product-availability >}} -{{% observability_pipelines/processors/parse_json %}} +## Overview + +This processor parses the specified JSON field into objects. For example, if you have a `message` field that contains stringified JSON: + +```json +{ + "foo": "bar", + "team": "my-team", + "message": "{\"level\":\"info\",\"timestamp\":\"2024-01-15T10:30:00Z\",\"service\":\"user-service\",\"user_id\":\"12345\",\"action\":\"login\",\"success\":true,\"ip_address\":\"192.168.1.100\"}" + "app_id":"streaming-services", + "ddtags": [ + "kube_service:my-service", + "k8_deployment :your-host" + ] +} +``` + +Use the Parse JSON processor to parse the `message` field so the `message` field has all the attributes within a nested object. + +{{< img src="observability_pipelines/processors/parse-json-example.png" alt="The parse json processor with message as the field to parse on" style="width:60%;" >}} + +This output contains the `message` field with the parsed JSON: + +```json +{ + "foo": "bar", + "team": "my-team", + "message": { + "action": "login", + "ip_address": "192.168.1.100", + "level": "info", + "service": "user-service", + "success": true, + "timestamp": "2024-01-15T10:30:00Z", + "user_id": "12345" + } + "app_id":"streaming-services", + "ddtags": [ + "kube_service:my-service", + "k8_deployment :your-host" + ] +} +``` + +## Setup + +To set up this processor: +1. Define a **filter query**. Only logs that match the specified [filter query](#filter-query-syntax) are processed. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline. +2. Enter the name of the field you want to parse JSON on.
**Note**: The parsed JSON overwrites what was originally contained in the field. {{% observability_pipelines/processors/filter_syntax %}} diff --git a/content/en/observability_pipelines/processors/parse_xml.md b/content/en/observability_pipelines/processors/parse_xml.md index 1d5c768d555..3fc9f1c0ae9 100644 --- a/content/en/observability_pipelines/processors/parse_xml.md +++ b/content/en/observability_pipelines/processors/parse_xml.md @@ -16,7 +16,127 @@ products: {{< product-availability >}} -{{% observability_pipelines/processors/parse_xml %}} +## Overview + +This processor parses Extensible Markup Language (XML) so the data can be processed and sent to different destinations. XML is a log format used to store and transport structured data. It is organized in a tree-like structure to represent nested information and uses tags and attributes to define the data. For example, this is XML data using only tags (``,``, and ``) and no attributes: + +```xml + + pasta + Carbonara + +``` + +This is an XML example where the tag `recipe` has the attribute `type`: + +```xml + + + Carbonara + +``` + +The following image shows a Windows Event 4625 log in XML, next to the same log parsed and output in JSON. By parsing the XML log, the size of the log event was reduced by approximately 30%. + +{{< img src="observability_pipelines/processors/xml-side-by-side.png" alt="The XML log and the resulting parsed log in JSON" style="width:80%;" >}} + +## Setup + +To set up this processor: + +1. Define a filter query. Only logs that match the specified filter query are processed. All logs, regardless of whether they match the filter query, are sent to the next step in the pipeline. +1. Enter the path to the log field on which you want to parse XML. Use the path notation `.` to match subfields. See the [Path notation example](#path-notation-example-parse-xml) below. +1. Optionally, in the `Enter text key` field, input the key name to use for the text node when XML attributes are appended. See the [text key example](#text-key-example). If the field is left empty, `value` is used as the key name. +1. Optionally, select `Always use text key` if you want to store text inside an object using the text key even when no attributes exist. +1. Optionally, toggle `Include XML attributes` on if you want to include XML attributes. You can then choose to add the attribute prefix you want to use. See [attribute prefix example](#attribute-prefix-example). If the field is left empty, the original attribute key is used. +1. Optionally, select if you want to convert data types into numbers, Booleans, or nulls. + - If **Numbers** is selected, numbers are parsed as integers and floats. + - If **Booleans** is selected, `true` and `false` are parsed as Booleans. + - If **Nulls** is selected, the string `null` is parsed as null. + +##### Path notation example {#path-notation-example-parse-xml} + +For the following message structure: + +```json +{ + "outer_key": { + "inner_key": "inner_value", + "a": { + "double_inner_key": "double_inner_value", + "b": "b value" + }, + "c": "c value" + }, + "d": "d value" +} +``` + +- Use `outer_key.inner_key` to see the key with the value `inner_value`. +- Use `outer_key.inner_key.double_inner_key` to see the key with the value `double_inner_value`. + +##### Always use text key example + +If **Always use text key** is selected, the text key is the default (`value`), and you have the following XML: + +```xml + + + Carbonara + +``` + +The XML is converted to: + +```json +{ + "recipe": { + "type": "pasta", + "value": "Carbonara" + } +} +``` + +##### Text key example + +If the key is `text` and you have the following XML: + +```xml + + + Carbonara + +``` + +The XML is converted to: + +```json +{ + "recipe": { + "type": "pasta", + "text": "Carbonara" + } +} +``` + +##### Attribute prefix example + +If you enable **Include XML attributes**, the attribute is added as a prefix to each XML attribute. For example, if the attribute prefix is `@` and you have the following XML: + +```xml +Carbonara +``` + +Then it is converted to the JSON: + +```json +{ + "recipe": { + "@type": "pasta", + "": "Carbonara" + } +} +``` {{% observability_pipelines/processors/filter_syntax %}} diff --git a/content/en/observability_pipelines/processors/quota.md b/content/en/observability_pipelines/processors/quota.md index e66a85438b0..e4d26c7385a 100644 --- a/content/en/observability_pipelines/processors/quota.md +++ b/content/en/observability_pipelines/processors/quota.md @@ -9,6 +9,83 @@ products: {{< product-availability >}} -{{% observability_pipelines/processors/quota %}} +## Overview + +The quota processor measures the logging traffic for logs that match the filter you specify. When the configured daily quota is met inside the 24-hour rolling window, the processor can either keep or drop additional logs, or send them to a storage bucket. For example, you can configure this processor to drop new logs or trigger an alert without dropping logs after the processor has received 10 million events from a certain service in the last 24 hours. + +You can also use field-based partitioning, such as `service`, `env`, `status`. Each unique fields uses a separate quota bucket with its own daily quota limit. See [Partition example](#partition-example) for more information. + +**Note**: The pipeline uses the name of the quota to identify the same quota across multiple Remote Configuration deployments of the Worker. + +### Limits + +- Each pipeline can have up to 1000 buckets. If you need to increase the bucket limit, [contact support][5]. +- The quota processor is synchronized across all Workers in a Datadog organization. For the synchronization, there is a default rate limit of 50 Workers per organization. When there are more than 50 Workers for an organization: + - The processor continues to run, but does not sync correctly with the other Workers, which can result in logs being sent after the quota limit has been reached. + - The Worker prints `Failed to sync quota state` errors. + - [Contact support][5] if you want to increase the default number of Workers per organization. +- The quota processor periodically synchronizes counts across Workers a few times per minute. The limit set on the processor can therefore be overshot, depending on the number of Workers and the logs throughput. Datadog recommends setting a limit that is at least one order of magnitude higher than the volume of logs that the processor is expected to receive per minute. You can use a throttle processor with the quota processor to control these short bursts by limiting the number of logs allowed per minute. + +## Setup + +To set up the quota processor: +1. Enter a name for the quota processor. +1. Define a **filter query**. Only logs that match the specified [filter query](#filter-query-syntax) are counted towards the daily limit. + - Logs that match the quota filter and are within the daily quota are sent to the next step in the pipeline. + - Logs that do not match the quota filter are sent to the next step of the pipeline. +1. In the **Unit for quota** dropdown menu, select if you want to measure the quota by the number of `Events` or by the `Volume` in bytes. +1. Set the daily quota limit and select the unit of magnitude for your desired quota. +1. Optional: Click **Add Field** if you want to set a quota on a specific service or region field. + 1. Enter the field name you want to partition by. See the [Partition example](#partition-example) for more information. + 1. Select the **Ignore when missing** if you want the quota applied only to events that match the partition. See the [Ignore when missing example](#example-for-the-ignore-when-missing-option) for more information. + 1. Optional: Click **Overrides** if you want to set different quotas for the partitioned field. + - Click **Download as CSV** for an example of how to structure the CSV. + - Drag and drop your overrides CSV to upload it. You can also click **Browse** to select the file to upload it. See the [Overrides example](#overrides-example) for more information. + 1. Click **Add Field** if you want to add another partition. +1. In the **When quota is met** dropdown menu, select if you want to **drop events**, **keep events**, or **send events to overflow destination**, when the quota has been met. + 1. If you select **send events to overflow destination**, an overflow destination is added with the following cloud storage options: **Amazon S3**, **Azure Blob**, and **Google Cloud**. + 1. Select the cloud storage you want to send overflow logs to. See the setup instructions for your cloud storage: [Amazon S3][2], [Azure Blob Storage][3], or [Google Cloud Storage][4]. + +#### Examples + +##### Partition example + +Use **Partition by** if you want to set a quota on a specific service or region. For example, if you want to set a quota for 10 events per day and group the events by the `service` field, enter `service` into the **Partition by** field. + +##### Example for the "ignore when missing" option + +Select **Ignore when missing** if you want the quota applied only to events that match the partition. For example, if the Worker receives the following set of events: + +``` +{"service":"a", "source":"foo", "message": "..."} +{"service":"b", "source":"bar", "message": "..."} +{"service":"b", "message": "..."} +{"source":"redis", "message": "..."} +{"message": "..."} +``` + +And the **Ignore when missing** is selected, then the Worker: +- creates a set for logs with `service:a` and `source:foo` +- creates a set for logs with `service:b` and `source:bar` +- ignores the last three events + +The quota is applied to the two sets of logs and not to the last three events. + +If the **Ignore when missing** is not selected, the quota is applied to all five events. + +##### Overrides example + +If you are partitioning by `service` and have two services: `a` and `b`, you can use overrides to apply different quotas for them. For example, if you want `service:a` to have a quota limit of 5,000 bytes and `service:b` to have a limit of 50 events, the override rules look like this: + +| Service | Type | Limit | +| ------- | ------ | ----- | +| `a` | Bytes | 5,000 | +| `b` | Events | 50 | + +[1]: /monitors/types/metric/?tab=threshold +[2]: /observability_pipelines/destinations/amazon_s3/ +[3]: /observability_pipelines/destinations/azure_storage/ +[4]: /observability_pipelines/destinations/google_cloud_storage/ +[5]: /help/ {{% observability_pipelines/processors/filter_syntax %}} \ No newline at end of file diff --git a/content/en/observability_pipelines/processors/reduce.md b/content/en/observability_pipelines/processors/reduce.md index 62366b4c7ca..6e400dac824 100644 --- a/content/en/observability_pipelines/processors/reduce.md +++ b/content/en/observability_pipelines/processors/reduce.md @@ -9,6 +9,39 @@ products: {{< product-availability >}} -{{% observability_pipelines/processors/reduce %}} +## Overview + +The reduce processor groups multiple log events into a single log, based on the fields specified and the merge strategies selected. Logs are grouped at 10-second intervals. After the interval has elapsed for the group, the reduced log for that group is sent to the next step in the pipeline. + +## Setup + +To set up the reduce processor: +1. Define a **filter query**. Only logs that match the specified [filter query](#filter-query-syntax) are processed. Reduced logs and logs that do not match the filter query are sent to the next step in the pipeline. +2. In the **Group By** section, enter the field you want to group the logs by. +3. Click **Add Group by Field** to add additional fields. +4. In the **Merge Strategy** section: + - In **On Field**, enter the name of the field you want to merge the logs on. + - Select the merge strategy in the **Apply** dropdown menu. This is the strategy used to combine events. See the following [Merge strategies](#merge-strategies) section for descriptions of the available strategies. + - Click **Add Merge Strategy** to add additional strategies. + +##### Merge strategies + +These are the available merge strategies for combining log events. + + +| Name | Description | +| -------------- | ------------------------------------------------------------------------------------------------------------------ | +| Array | Appends each value to an array. | +| Concat | Concatenates each string value, delimited with a space. | +| Concat newline | Concatenates each string value, delimited with a newline. | +| Concat raw | Concatenates each string value, without a delimiter. | +| Discard | Discards all values except the first value that was received. | +| Flat unique | Creates a flattened array of all unique values that were received. | +| Longest array | Keeps the longest array that was received. | +| Max | Keeps the maximum numeric value that was received. | +| Min | Keeps the minimum numeric value that was received. | +| Retain | Discards all values except the last value that was received. Works as a way to coalesce by not retaining \`null\`. | +| Shortest array | Keeps the shortest array that was received. | +| Sum | Sums all numeric values that were received. | {{% observability_pipelines/processors/filter_syntax %}} \ No newline at end of file diff --git a/content/en/observability_pipelines/processors/remap_ocsf.md b/content/en/observability_pipelines/processors/remap_ocsf.md index b8696e9abbf..436fc54cfc1 100644 --- a/content/en/observability_pipelines/processors/remap_ocsf.md +++ b/content/en/observability_pipelines/processors/remap_ocsf.md @@ -9,17 +9,76 @@ products: {{< product-availability >}} -{{% observability_pipelines/processors/remap_ocsf %}} +## Overview + +Use this processor to remap logs to Open Cybersecurity Schema Framework (OCSF) events. OCSF schema event classes are set for a specific log source and type. You can add multiple mappings to one processor. **Note**: Datadog recommends that the OCSF processor be the last processor in your pipeline, so that remapping is done after the logs have been processed by all the other processors. + +## Setup + +To set up this processor: + +Click **Manage mappings**. This opens a modal: + +- If you have already added mappings, click on a mapping in the list to edit or delete it. You can use the search bar to find a mapping by its name. Click **Add Mapping** if you want to add another mapping. Select **Library Mapping** or **Custom Mapping** and click **Continue**. +- If you have not added any mappings yet, select **Library Mapping** or **Custom Mapping**. Click **Continue**. {{% collapse-content title="Library mapping" level="h5" expanded=false id="library_mapping" %}} -{{% observability_pipelines/processors/remap_ocsf_library_mapping %}} +#### Add a mapping + +1. Select the log type in the dropdown menu. +1. Define a [filter query](#filter-query-syntax). Only logs that match the specified filter query are remapped. All logs, regardless of whether they do or do not match the filter query, are sent to the next step in the pipeline. +1. Review the sample source log and the resulting OCSF output. +1. Click **Save Mapping**. + +#### Library mappings + +These are the library mappings available: + +| Log Source | Log Type | OCSF Category | Supported OCSF versions| +|------------------------|-----------------------------------------------|-------------------------------| -----------------------| +| AWS CloudTrail | Type: Management
EventName: ChangePassword | Account Change (3001) | 1.3.0
1.1.0 | +| Google Cloud Audit | SetIamPolicy | Account Change (3001) | 1.3.0
1.1.0 | +| Google Cloud Audit | CreateSink | Account Change (3001) | 1.3.0
1.1.0 | +| Google Cloud Audit | UpdateSync | Account Change (3001) | 1.3.0
1.1.0 | +| Google Cloud Audit | CreateBucket | Account Change (3001) | 1.3.0
1.1.0 | +| GitHub | Create User | Account Change (3001) | 1.1.0 | +| Google Workspace Admin | addPrivilege | User Account Management (3005)| 1.1.0 | +| Okta | User session start | Authentication (3002) | 1.1.0 | +| Microsoft 365 Defender | Incident | Incident Finding (2005) | 1.3.0
1.1.0 | +| Palo Alto Networks | Traffic | Network Activity (4001) | 1.1.0 | {{% /collapse-content %}} {{% collapse-content title="Custom mapping" level="h5" expanded=false id="custom_mapping" %}} -{{% observability_pipelines/processors/remap_ocsf_custom_mapping %}} +When you set up a custom mapping, if you try to close or exit the modal, you are prompted to export your mapping. Datadog recommends that you export your mapping to save what you have set up so far. The exported mapping is saved as a JSON file. + +To set up a custom mapping: + +1. Optionally, add a name for the mapping. The default name is `Custom Authentication`. +1. Define a [filter query](#filter-query-syntax). Only logs that match the specified filter query are remapped. All logs, regardless of whether they match the filter query, are sent to the next step in the pipeline. +1. Select the OCSF event category from the dropdown menu. +1. Select the OCSF event class from the dropdown menu. +1. Enter a log sample so that you can reference it when you add fields. +1. Click **Continue**. +1. Select any OCSF profiles that you want to add. See [OCSF Schema Browser][1] for more information. +1. All required fields are shown. Enter the required **Source Logs Fields** and **Fallback Values** for them. If you want to manually add additional fields, click **+ Field**. Click the trash can icon to delete a field. **Note**: Required fields cannot be deleted. + - The fallback value is used for the OCSF field if the log doesn't have the source log field. + - You can add multiple fields for **Source Log Fields**. For example, Okta's `user.system.start` logs have either the `eventType` or `legacyEventType` field. You can map both fields to the same OCSF field. + - If you have your own OCSF mappings in JSON or saved a previous mapping that you want to use, click **Import Configuration File**. +1. Click **Continue**. +1. Some log source values must be mapped to OCSF values. For example, the values of a source log's severity field that is mapped to the OCSF's `severity_id` field, must be mapped to the OCSF `severity_id`'s values. See `severity_id` in [Authentication][2] for a list of OCSF values. An example of mapping severity values: + | Log source value | OCSF value | + | ---------------- | --------------- | + | `INFO` | `Informational` | + | `WARN` | `Medium` | + | `ERROR` | `High` | +1. All values that are required to be mapped to an OCSF value are listed. Click **+ Add Row** if you want to map additional values. +1. Click **Save Mapping**. + +[1]: https://schema.ocsf.io/ +[2]: https://schema.ocsf.io/1.4.0/classes/authentication?extensions= {{% /collapse-content %}}