Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JobSink documentation #6005

Merged
merged 4 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/nav.yml
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ nav:
- Subscriptions: eventing/channels/subscriptions.md
- Event sinks:
- About sinks: eventing/sinks/README.md
- JobSink: eventing/sinks/job-sink.md
- Apache Kafka Sink: eventing/sinks/kafka-sink.md
- Flows:
- About flows: eventing/flows/README.md
Expand Down
2 changes: 1 addition & 1 deletion docs/eventing/brokers/broker-developer-config-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ spec:
uri: example-uri
retry: 5
backoffPolicy: exponential
backoffDelay: "2007-03-01T13:00:00Z/P1Y2M10DT2H30M"
backoffDelay: "PT1S"
Copy link
Member Author

@pierDipi pierDipi Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated fix but I couldn't resist as it's so wrong that time

```

- You can specify any valid `name` for your broker. Using `default` will create a broker named `default`.
Expand Down
9 changes: 5 additions & 4 deletions docs/eventing/sinks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,11 @@ The `svc` in `http://event-display.svc.cluster.local` determines that the sink i

## Supported third-party sink types

| Name | Maintainer | Description |
| -- | -- | -- |
| [KafkaSink](kafka-sink.md) | Knative | Send events to a Kafka topic |
| [RedisSink](https://github.com/knative-extensions/eventing-redis/tree/main/sink) | Knative | Send events to a Redis Stream |
| Name | Maintainer | Description |
|----------------------------------------------------------------------------------| -- |--------------------------------------|
| [JobSink](job-sink.md) | Knative | Trigger long-running background jobs |
| [KafkaSink](kafka-sink.md) | Knative | Send events to a Kafka topic |
| [RedisSink](https://github.com/knative-extensions/eventing-redis/tree/main/sink) | Knative | Send events to a Redis Stream |


[kubernetes-kinds]:
Expand Down
303 changes: 303 additions & 0 deletions docs/eventing/sinks/job-sink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
# JobSink, triggering long-running background jobs when events occurs

Usually event processing combined with a Knative Service is expected to complete in a relative short
period of time (minutes) as it requires the HTTP connection to stay open as otherwise the service is
scaled down.

Keeping long-running connections open increases the possibility of failing and so
the processing needs to restart as the request is retried.

This limitation is not ideal, `JobSink` is a resource you can use to create long-running
asynchronous jobs and tasks.

`JobSink` supports the full
Kubernetes [batch/v1 Job resource and features](https://kubernetes.io/docs/concepts/workloads/controllers/job/)
and Kubernetes Job queuing systems like [Kueue](https://kueue.sigs.k8s.io/).
Comment on lines +13 to +15
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe mention something along the lines of except the .metadata.name field? As far as I know, we set the name ourselves

Copy link
Member Author

@pierDipi pierDipi Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a detail we can add here at the introduction (and also is kinda intuitive, if jobs are created dynamically "to create long-running asynchronous jobs and tasks" then a name can't be fixated)


## Prerequisites

You must have access to a Kubernetes cluster
with [Knative Eventing installed](../../install/yaml-install/eventing/install-eventing-with-yaml.md).

## Usage

When an event is sent to a `JobSink`, Eventing creates a `Job` and mount the received event as
pierDipi marked this conversation as resolved.
Show resolved Hide resolved
JSON file at `/etc/jobsink-event/event`.

1. Create a `JobSink`
```yaml
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
name: job-sink-logger
spec:
job:
spec:
completions: 1
parallelism: 1
template:
spec:
restartPolicy: Never
containers:
- name: main
image: docker.io/library/bash:5
command: [ "cat" ]
args:
- "/etc/jobsink-event/event"
```
2. Apply the `JobSink` resource:
```shell
kubectl apply -f <job-sink-file.yaml>
```
3. Verify `JobSink` is ready:
```shell
kubectl get jobsinks.sinks.knative.dev
```
Example output:
```shell
NAME URL AGE READY REASON
job-sink-logger http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger 5s True
```
4. Trigger a `JobSink`
```shell
kubectl run curl --image=curlimages/curl --rm=true --restart=Never -ti -- -X POST -v \
-H "content-type: application/json" \
-H "ce-specversion: 1.0" \
-H "ce-source: my/curl/command" \
-H "ce-type: my.demo.event" \
-H "ce-id: 123" \
-d '{"details":"JobSinkDemo"}' \
http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger
```
5. Verify a `Job` is created and prints the event:
```shell
kubectl logs job-sink-loggerszoi6-dqbtq
```
Example output:
```shell
{"specversion":"1.0","id":"123","source":"my/curl/command","type":"my.demo.event","datacontenttype":"application/json","data":{"details":"JobSinkDemo"}}
```

### JobSink idempotency

`JobSink` will create a job for each _different_ received event.

An event is uniquely identified by the combination of event `source` and `id` attributes.

If an event with the same `source` and `id` attributes is received and a job is already present,
another `Job` will _not_ be created.

### Reading the event file

You can read the file and deserialize it using any [CloudEvents](https://github.com/cloudevents)
JSON deserializer.

For example, the following snippet reads an event using the CloudEvents Go SDK and processes it.
Comment on lines +90 to +95
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this section is a little confusing without some more context, because as far as I can tell we don't mention before this that the event is mounted as a file into the job

Copy link
Member Author

@pierDipi pierDipi Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in the first sentence after # Usage at line 24 ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I guess I missed that - my bad!


```go
package mytask

import (
"encoding/json"
"fmt"
"os"

cloudevents "github.com/cloudevents/sdk-go/v2"
)

func handleEvent() error {
eventBytes, err := os.ReadFile("/etc/jobsink-event/event")
if err != nil {
return err
}

event := &cloudevents.Event{}
if err := json.Unmarshal(eventBytes, event); err != nil {
return err
}

// Process event ...
fmt.Println(event)

return nil
}
```

### Trigger a Job from different event sources

A `JobSink` can be triggered by any [event source](./../sources) or [trigger](./../triggers).

For example, you can trigger a `Job` when a Kafka record is sent to a Kafka topic using
a [`KafkaSource`](./../sources/kafka-source):

```yaml
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
topics:
- knative-demo-topic
sink:
ref:
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
name: job-sink-logger
```

or when Knative Broker receives an event using a [`Trigger`](./../triggers):

```yaml
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: my-job-sink-trigger
spec:
broker: my-broker
filter:
attributes:
type: dev.knative.foo.bar
myextension: my-extension-value
subscriber:
ref:
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
name: job-sink-logger
```

or even as dead letter sink for a Knative Broker

```yaml
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: my-broker
spec:
# ...

delivery:
deadLetterSink:
ref:
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
name: job-sink-logger
retry: 5
backoffPolicy: exponential
backoffDelay: "PT1S"
```

### Customizing the event file directory

```yaml
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
name: job-sink-custom-mount-path
spec:
job:
spec:
completions: 1
parallelism: 1
template:
spec:
restartPolicy: Never
containers:
- name: main
image: docker.io/library/bash:5
command: [ "bash" ]
args:
- -c
- echo "Hello world!" && sleep 5

# The event will be available in a file at `/etc/custom-path/event`
volumeMounts:
- name: "jobsink-event"
mountPath: "/etc/custom-path"
readOnly: true

```

### Cleaning up finished jobs

To clean up finished jobs, you can set
the [`spec.job.spec.ttlSecondsAfterFinished: 600` field](https://kubernetes.io/docs/concepts/workloads/controllers/job/#clean-up-finished-jobs-automatically)
and Kubernetes will remove finished jobs after 600 seconds (10 minutes).

## JobSink examples

### JobSink success example

```yaml
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
name: job-sink-success
spec:
job:
metadata:
labels: my-label
pierDipi marked this conversation as resolved.
Show resolved Hide resolved
spec:
completions: 12
parallelism: 3
template:
spec:
restartPolicy: Never
containers:
- name: main
image: docker.io/library/bash:5
command: [ "bash" ]
args:
- -c
- echo "Hello world!" && sleep 5
backoffLimit: 6
podFailurePolicy:
rules:
- action: FailJob
onExitCodes:
containerName: main # optional
operator: In # one of: In, NotIn
values: [ 42 ]
- action: Ignore # one of: Ignore, FailJob, Count
onPodConditions:
- type: DisruptionTarget # indicates Pod disruption
```

### JobSink failure example

```yaml
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
name: job-sink-failure
spec:
job:
metadata:
labels: my-label
pierDipi marked this conversation as resolved.
Show resolved Hide resolved
spec:
completions: 12
parallelism: 3
template:
spec:
restartPolicy: Never
containers:
- name: main
image: docker.io/library/bash:5
command: [ "bash" ] # example command simulating a bug which triggers the FailJob action
args:
- -c
- echo "Hello world!" && sleep 5 && exit 42
backoffLimit: 6
podFailurePolicy:
rules:
- action: FailJob
onExitCodes:
containerName: main # optional
operator: In # one of: In, NotIn
values: [ 42 ]
- action: Ignore # one of: Ignore, FailJob, Count
onPodConditions:
- type: DisruptionTarget # indicates Pod disruption
```

Loading