2.x
Ask or search…
K

HTTP API Collector

HTTP API Collector source plugin
Plugin name: http_loader.
HTTP API Collector lets developers gather and process data from various external sources through HTTP requests. This collector lets developers seamlessly collect data from various HTTP endpoints for processing and storage.
You can use the HTTP API Collector source plugin to configure your Calyptia Core pipeline to collect data from your HTTP endpoints.

Configuration parameters

The HTTP API Collector source plugin provides these configuration parameters.

Request

Key
Required
Default
Description
method
false
GET
Request method. Defaults to GET, or POST if body is set. Supports templating.
url
true
Request URL. Supports templating.
header
false
User-Agent: Fluent-Bit HTTP Loader Plugin
Request headers, string separated by new line character . Supports templating.
body
false
Request body. Supports templating.

TLS

Key
Required
Default
Description
tls_cert
false
PEM encoded TLS cert. It must be set in combination with tls_key.
tls_key
false
PEM encoded TLS key. It must be set in combination with tls_cert.
ca_cert
false
PEM encoded CA cert.

TLS (as file paths)

Key
Required
Default
Description
tls_cert_file
false
PEM encoded TLS cert file path. Must be set in combination with tls_key_file.
tls_key_file
false
PEM encoded TLS key file path. Must be set in combination with tls_cert_file.
ca_cert_file
false
PEM encoded CA cert file path.

Proxy

Key
Required
Default
Description
proxy
false
Proxy URL to make requests through.
no_proxy
false
Comma separated list of URLs to exclude from proxying.

Time Control

Key
Required
Default
Description
timeout
false
0s
Controls the request timeout, string duration. If set, it must be greater than 0s.
pull_interval
false
1s
Controls the time between requests, string duration. If set, it must be greater than 0s. Supports templating.
wait
false
0s
How much time to wait before starting collection. Useful to sync with pull_interval. Supports templating. Should evaluate to a string duration.

Output Control

Key
Required
Default
Description
skip
false
{{or (ge .Response.StatusCode 400) (empty .Response.Body)}}
Controls when to skip sending records to fluent-bit. Supports templating.\nDefaults to ignore error status codes, and empty response body.
stop
false
Controls when the plugin will stop working. Use it dump data into fluent-bit and then finish.
out
false
{{toJson .Response.Body}}
Controls what to send to fluent-bit, supports templating. Defaults to send the response body.

Retry

Key
Required
Default
Description
retry
false
false
Controls whether to retry the current request. Supports templating. Should evaluate to a Boolean.
max_retries
false
1

OAuth2

Key
Required
Default
Description
oauth2_token_url
false
OAuth2 token endpoint at where to exchange a token. Enables OAuth2 using the client-credentials flow.
oauth2_client_id
false
OAuth2 client ID.
oauth2_client_secret
false
OAuth2 client secret. Sensible field, prefer using pipeline secrets.
oauth2_scopes
false
OAuth2 scopes. String, each scope separated by space.
oauth2_endpoint_params
false
OAuth2 endpoint parameters. String in URL query string format.
Key
Required
Default
Description
auth_cookie_url
false
Cookie based authentication URL.
auth_cookie_method
false
Cookie based authentication request method. Defaults to GET, or POST if auth_cookie_body is set.
auth_cookie_header
false
Cookie based authentication request headers. String separated by new line character .
auth_cookie_body
false
Cookie based authentication request body.

Storage

Key
Required
Default
Description
data_dir
false
/data/storage
Controls where to store data, data which is used to resume collecting.\nDefaults to /data/storage if exists, or a temporary directory if available, otherwise storage is disabled.
data_exp
false
0s
Controls for how much time data can be used after resume.
store_response_body
false
toJson .Response.Body
Supports go-templating, It allows to define what to store as response body in storage; this is meant for large response payloads so the plugin does not exceeds the 5 MiB limit we have to store data in Cloud at the pipeline metadata API.

Templating

Inside go-templates you will have available the following data:
  • Index: (int) always available. Is the current fetch index.
  • Request: (Request struct) available after a successful fetch.
  • Response: (Response struct) available after a successful fetch.
  • LastRequestTime (*time.Time) stores the time when the last request was made. Available after a successful request.
  • LastResponseTime (*time.Time) stores the time when the last response was received. Available after a successful request and response roundtrip.
Inside Request struct you will find the following fields.
  • Method: string.
  • URL: *url.URL.
  • Header: http.Header.
  • Body: any.
Inside Response struct you will find the following fields.
  • StatusCode: int.
  • Header: http.Header.
  • Body: any.
You can use any function provided by sprig, additionally to:
  • timeRFC3339: func() string returns a constant RFC3339 (2006-01-02T15:04:05Z07:00) time format. Can be useful to format time.Time as JSON timestamp.
  • nextLink: func (http.Header) string returns the URL from the header Link: rel=next. Useful for pagination.
  • parseDuration: func (string) (Duration, error) parses a string as time.Duration.
  • has: func(obj any, key string) checks whether the given key exists inside an object.
  • jq: func(query string, data any) (any, error) transforms data using jq.
  • log: func(any...) prints a message to stdout.
  • logf: func(format string, args any...) prints a formatted message to stdout.
  • set_variable: func(key string, value any) store a persistent variable you can reference later on inside other go-template executions.
  • has_variable: func(key string) checks whether the given variable is set.
  • get_variable: func(key string) retrieve a previously stored persistent variable, returns the value or an empty string.
  • unset_variable: func(key string) deletes a previously stored persistent variable.

Time

A common thing is to manipulate time parameters. Here are some common operations:
  • now returns the current time as time.Time type.
  • now.Format "2006-01-02T15:04:05Z07:00" returns the current time as a string in RFC3339 format.
  • now.Format timeRFC3339 same as before but using a utility constant.
  • mustToDate timeRFC3339 .Response.Body.time converts the time field inside the response body to time.Time using the RFC3339 time format.
  • .Response.Body.time | mustToDate timeRFC3339 same as before but using the pipe operator.
  • mustDateModify "+1h" now return the current time plus 1 hour as time.Time.
  • now | mustDateModify "-1h" similar to the previous one, but uses the pipe operator, and also subtracts 1 hour. - (now | mustDateModify "+1h").Format timeRFC3339 returns the current time plus 1 hour as a string in RFC3339 time format. Notice the usage of parenthesis. - ((.Response.Body.time | mustToDate timeRFC3339) | mustDateModify "+1h").Format timeRFC3339 parses the response body time field to time.Time then adds 1 hour, then formats it to string in RFC3339 time format.

Additional Go template resources

Example: Swapi

Here is an example using the swapi.dev API.
pipeline:
inputs:
- name: http_loader
url: |-
{{with .Response.Body.next}}{{.}}{{else}}https://swapi.dev/api/people{{end}}
out: |-
{{toJson .Response.Body.results}}
skip: |-
{{or (ge .Response.StatusCode 400) (empty .Response.Body.results)}}
By fetching https://swapi.dev/api/people we get a response body like the following:
{
"next": "https://swapi.dev/api/people/?page=2",
"results": [
{...},
{...},
]
}

URL

If we get next in the response body, then we use that as the URL, otherwise we set the default to https://swapi.dev/api/people.

Out

We select the results field from the response body and we transform it to JSON. This will try to send an array to fluent-bit, but arrays are not supported, the plugin will automatically split the array and send each item like so:
{ "index": 1, "value": {} }
{ "index": 2, "value": {} }
{ "index": 3, "value": {} }
Think of it as the following:
arr.map((value, index) => ({ index, value })).forEach(send)

Skip

With skip we can control when to skip sending data to fluent-bit. In this case we want to skip if the response status code is greater or equal to 400, or if the results field in the response body is empty.

Transform

Say you are only interested in the name property on each result.
{
"next": "https://swapi.dev/api/people/?page=2",
"results": [
{
"name": "Luke Skywalker",
...
},
{
"name": "C-3PO",
...
}
]
}
Inside the go-templating you can use jq to transform the data. Change out to the following to map over each result and extract only the name:
{{.Response.Body | jq ".results.[] | {name}" | toJson}}

Example: JSON placeholder

Let's take another example using the JSON placeholder API.
This API supports pagination, but does not provide a next URL to lookup. Instead the client should pass a _page query string parameter; starting at 1.
pipeline:
inputs:
- name: http_loader
url: |-
https://jsonplaceholder.typicode.com/posts?_limit=10&_page={{add .Index 1}}
retry: |-
{{ge .Response.StatusCode 500}}
max_retries: 3

Index

Inside the template context you will always find the variable .Index which is an auto-increasing number each time the plugin makes a request. It starts at 0.
In this case, we want page to start at 1, so we add 1 to our index and that way we can advance the page.

Retry

You can enable retrying of requests. In this case, we want to retry if we get a response with an status code greater than or equal to 500. Also, in the case the HTTP client fails for some other reason, maybe a networking issue, and you don't get a response back, the request will be retried as well.
You can also control how many tries will be performed using max_retries. In this case, the plugin will try at most 3 times.

Example: Okta

Let's take a look at the Okta API. This API returns a Link: <url>; rel="next" header which we can use to paginate.
pipeline:
inputs:
- name: http_loader
url: |-
{{- with nextLink .Response.Header -}}
{{.}}
{{- else -}}
https://{replaceWithYourDomain}/api/v1/logs
{{- end -}}
headers: "Authorization: SSWS {{secrets.oktaAPIToken}}"
There is a helper function called nextLink that takes some headers and finds the Link header with rel=next. This is exactly what is needed in this case.
In case we do get the header, we use that as URL, otherwise we set the default Okta URL of our domain.
Notice that with works as an if but overrides that current template data with the result. In this case it is equivalent to:
{{- if nextLink .Response.Header -}}
{{nextLink .Response.Header}}

Example: Carbon Black

This time lets take a look at Carbon Black's API.
It uses an "offset" style pagination controlled by start and rows but we are most interested in the time-range filtering and mixing pagination with it.
Time range can be controlled with either a combination of time_range.start and time_range.end both fixed timestamps.
There is also the option range a relative value (-2h). But to have complete control we will use fixed timestamps.
We will issue a POST request with a body like the following:
{
"time_range": {
"start": "2023-08-14T01:00:00.000Z",
"end": "2023-08-14T02:00:00.000Z"
},
"start": 1,
"rows": 5
}
Advance the page by changing start to previous start + rows. For example, 1, 6, 11, 16, and so on.
Change time_range only after you have finished paging through the current time range.
pipeline:
inputs:
- name: http_loader
url: https://defense.conferdeploy.net/api/alerts/v7/orgs/ABCD1234/alerts/_search
header: |-
Content-Type: application/json
X-Auth-Token: {{secrets.blackCarbonAuthToken}}
body: |-
{{- $now := now.UTC.Truncate (parseDuration "1h") -}}
{{- $timeStart := $now | mustDateModify "-1h" -}}
{{- $timeEnd := $now -}}
{{- $start := 1 -}}
{{- if and .Request .Response -}}
{{- $prevTimeEnd := .Request.Body.time_range.end | mustToDate timeRFC3339 -}}
{{- $start = add .Request.Body.start 5 -}}
{{- if or (empty .Response.Body.results) (lt (len .Response.Body.results) 5) -}}
{{- $timeStart = $prevTimeEnd -}}
{{- $timeEnd = ($timeStart | mustDateModify "+1h") -}}
{{- $start = 1 -}}
{{- end -}}
{{- if $timeEnd.Before $prevTimeEnd -}}
{{- $timeEnd = $prevTimeEnd -}}
{{- $timeStart = ($timeEnd | mustDateModify "-1h") -}}
{{- end -}}
{{- end -}}
{"time_range":{"start":"{{$timeStart.Format timeRFC3339}}","end":"{{$timeEnd.Format timeRFC3339}}"},"start":{{$start}},"rows":5}
out: |-
{{toJson .Response.Body.results}}
skip: |-
{{or (ge .Response.StatusCode 400) (empty .Response.Body.results)}}
pull_interval: |-
{{- if ge .Response.StatusCode 400 -}}
10s
{{- else -}}
{{- $interval := parseDuration "100ms" -}}
{{- if or (empty .Response.Body.results) (lt (len .Response.Body.results) 5) -}}
{{- $interval = parseDuration "1h" -}}
{{- $currentTimeEnd := .Request.Body.time_range.end | mustToDate timeRFC3339 -}}
{{- $nextTimeEnd := $currentTimeEnd.Add $interval -}}
{{- if $nextTimeEnd.After now -}}
{{- $interval = $nextTimeEnd.Sub now -}}
{{- if le $interval.Nanoseconds 0 -}}
{{- $interval = parseDuration "1ns" -}}
{{- end -}}
{{- end -}}
{{- end -}}
{{$interval}}
{{- end -}}
We made use of go-templating variables to keep order in all of this.

Body

In the body, first we define a variable to hold the current timestamp truncated to a 1 hour unit. We use it to define both time range start and end. We also set the pagination to start at 1.
The variables .Request and .Response are not available initially, they are only available after a successful fetch. So the following code happens inside an if that checks that both of them are available.
There we retrieve the previous time range end. We also increase the pagination start by 5 (rows) to advance the page.
Then we check if we got less than 5 (rows) results, this means we reached the pages end; we finished paginating. In this case, change the time range window by adding to the previous request values. You can reset pagination by setting page start to 1.
There's an additional if that checks if the time range window is less than the previous time range window, in which case it sets it to the previous one.
Finally we use all these variables to construct the JSON request body.

Pull Interval

Then, it's important to set pull_interval. If we are not careful, we could start asking for time windows in the future that would not return any data.
First we do a sanity check and in case we get a status code >= 400 then we return 10s as pull_interval; we might even use a higher value.
Now, we want to set an initial pull interval of 100ms. Then, in case results inside the response body is empty or less than 5 (rows), it means we finished paginating. In such case we want to move the time window, to do so we increase the pull_interval to 1h. But here is the tricky part, if we are not careful we could start drifting the time away. So we add an if to check if we are moving the time window into the future, and in such case we limit the pull interval to next - now.
We also check in the case the resulting pull_interval is negative and we set it to a minimum value of 1ns.
In resume, in case of error we use a pull interval of 10s, in case we ended paginating we use 1h but maxed to now, and otherwise if we are paginating, we set it to 100ms.

Example: Sentinel One

Let's take a look at another example using the Sentinel One API.
Pagination uses a "cursor-based" style in this case. We can also control a time window, similar to how it is done on the Carbon Black API, but really, we are more interested in authentication.
The Carbon Black API used a token inside the request headers. But Sentinel One uses cookies, there is a separate login endpoint for it and we can specify it on the plugin to fetch the auth cookie and store it in the cookie jar.
pipeline:
inputs:
- name: http_loader
url: |-
{{- $now := now.UTC.Truncate (parseDuration "1h") -}}
{{- $start := $now | mustDateModify "-1h" -}}
{{- $end := $now -}}
{{- if and .Request .Response -}}
{{- $prevEnd := .Request.URL.Query.Get "updatedAt__lte" | mustToDate timeRFC3339 -}}
{{- if (empty .Response.Body.pagination.nextCursor) -}}
{{- $start = $prevEnd -}}
{{- $end = ($start | mustDateModify "+1h") -}}
{{- end -}}
{{- if $end.Before $prevEnd -}}
{{- $end = $prevEnd -}}
{{- $start = ($end | mustDateModify "-1h") -}}
{{- end -}}
{{- end -}}
https://usea1-partners.sentinelone.net/web/api/v2.1/threats?updatedAt__gte={{$start.Format timeRFC3339}}&updatedAt__lte={{$end.Format timeRFC3339}}&limit=5{{with .Response.Body.pagination.nextCursor}}&cursor={{.}}{{end}}
out: |-
{{toJson .Response.Body.data}}
skip: |-
{{or (ge .Response.StatusCode 400) (not (empty .Response.Body.errors)) (empty .Response.Body.data)}}
pull_interval: |-
{{- if or (ge .Response.StatusCode 400) (not (empty .Response.Body.errors)) -}}
10s
{{- else -}}
{{- $interval := parseDuration "100ms" -}}
{{- if empty .Response.Body.pagination.nextCursor -}}
{{- $interval = parseDuration "1h" -}}
{{- $currentEnd := .Request.URL.Query.Get "updatedAt__lte" | mustToDate timeRFC3339 -}}
{{- $nextEnd := $currentEnd.Add $interval -}}
{{- if $nextEnd.After now -}}
{{- $interval = $nextEnd.Sub now -}}
{{- if le $interval.Nanoseconds 0 -}}
{{- $interval = parseDuration "1ns" -}}
{{- end -}}
{{- end -}}
{{- end -}}
{{$interval}}
{{- end -}}
auth_cookie_url: |-
https://usea1-partners.sentinelone.net/web/api/v2.1/users/login/by-api-token
auth_cookie_body: |-
{"data": {"apiToken": "{{secrets.sentinelOneAPIToken}}"}}
auth_cookie_header: |-
Content-Type: application/json
If you set auth_cookie_url the plugin will issue a request to this URL before starting collecting.
You can control for how much time the cookie is valid by using auth_cookie_exp, which accepts a duration string. Once the cookie is expired, the plugin will try to renew it.
Important: This body does not support go-templates. The notation {{secrets.*}} is a feature available in the entire Calyptia Core pipelines. It's syntax might resemble go-templates but they are not.

Example: Dynatrace

The following examples uses the Dynatrace API. One important point of this API is that it uses a nextPageKey for pagination on the URL query string parameters, but this parameter is exclusive. When setting this parameter, you can only set that parameter, not the rest of the filters.
This prevents you from referencing the previous request parameters. For that, we use has_variable, get_variable, set_variable and unset_variable helpers so we can persist data in the plugin execution. These variables are persisted even after the fluent-bit process is restarted.
pipeline:
inputs:
- Name: http_loader
url: |-
{{- $environmentID := "ggy66547" -}}
{{- with .Response.Body.nextPageKey -}}
https://{{$environmentID}}.live.dynatrace.com/api/v2/auditlogs?nextPageKey={{.}}
{{- else -}}
{{- $now := now.UTC.Truncate (parseDuration "1h") -}}
{{- $from := $now | mustDateModify "-1h" -}}
{{- $to := $now -}}
{{- if has_variable "to" -}}
{{- $prevTo := get_variable "to" | mustToDate timeRFC3339 -}}
{{- $from = $prevTo -}}
{{- $to = ($from | mustDateModify "+1h") -}}
{{- if $to.Before $prevTo -}}
{{- $to = $prevTo -}}
{{- $from = ($to | mustDateModify "-1h") -}}
{{- end -}}
{{- end -}}
{{- set_variable "to" ($to.Format timeRFC3339) -}}
https://{{$environmentID}}.live.dynatrace.com/api/v2/auditlogs?from={{$from.Format timeRFC3339}}&to={{$to.Format timeRFC3339}}&sort=timestamp&pageSize=10
{{- end -}}
header: "Authorization: Api-Token {{secrets.dynatraceAPIToken}}"
out: |-
{{toJson .Response.Body.auditLogs}}
skip: |-
{{or (ge .Response.StatusCode 400) (empty .Response.Body.auditLogs)}}
pull_interval: |-
{{- if ge .Response.StatusCode 400 -}}
{{ log "error:" .Response.Body }}
10s
{{- else -}}
{{- $interval := parseDuration "100ms" -}}
{{- if empty .Response.Body.nextPageKey -}}
{{- $interval = parseDuration "1h" -}}
{{- $currentTo := get_variable "to" | mustToDate timeRFC3339 -}}
{{- $nextTo := $currentTo.Add $interval -}}
{{- if $nextTo.After now -}}
{{- $interval = $nextTo.Sub now -}}
{{- if le $interval.Nanoseconds 0 -}}
{{- $interval = parseDuration "1ns" -}}
{{- end -}}
{{- end -}}
{{- end -}}
{{$interval}}
{{- end -}}
In the url go-template we check if we got a nextPageKey in the response body, and in that case, we use that to paginate. Otherwise we set our time window parameters, similar to the previous examples, but we also make use of set_variable to persist our to parameter, that way we can reference it later and not depend on the previous request.
Last modified 12d ago