Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source iterate: new connector #2316

Merged
merged 2 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ on:
- "source-intercom-native/**"
- "source-shopify-native/**"
- "source-zendesk-support-native/**"
- "source-iterate/**"

pull_request:
branches: [main]
Expand Down Expand Up @@ -70,6 +71,7 @@ on:
- "source-intercom-native/**"
- "source-shopify-native/**"
- "source-zendesk-support-native/**"
- "source-iterate/**"

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
Expand Down Expand Up @@ -201,6 +203,10 @@ jobs:
type: capture
version: v1
usage_rate: "1.0"
- name: source-iterate
type: capture
version: v1
usage_rate: "1.0"

steps:
- uses: actions/checkout@v4
Expand Down
1 change: 1 addition & 0 deletions source-iterate/VERSION
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
v1
10 changes: 10 additions & 0 deletions source-iterate/acmeCo/flow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
collections:
acmeCo/survey_responses:
schema: survey_responses.schema.yaml
key:
- /_meta/row_id
acmeCo/surveys:
schema: surveys.schema.yaml
key:
- /_meta/row_id
31 changes: 31 additions & 0 deletions source-iterate/acmeCo/survey_responses.schema.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
$defs:
Meta:
properties:
op:
default: u
description: "Operation type (c: Create, u: Update, d: Delete)"
enum:
- c
- u
- d
title: Op
type: string
row_id:
default: -1
description: "Row ID of the Document, counting up from zero, or -1 if not known"
title: Row Id
type: integer
title: Meta
type: object
additionalProperties: true
properties:
_meta:
$ref: "#/$defs/Meta"
default:
op: u
row_id: -1
description: Document metadata
title: FullRefreshResource
type: object
x-infer-schema: true
31 changes: 31 additions & 0 deletions source-iterate/acmeCo/surveys.schema.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
$defs:
Meta:
properties:
op:
default: u
description: "Operation type (c: Create, u: Update, d: Delete)"
enum:
- c
- u
- d
title: Op
type: string
row_id:
default: -1
description: "Row ID of the Document, counting up from zero, or -1 if not known"
title: Row Id
type: integer
title: Meta
type: object
additionalProperties: true
properties:
_meta:
$ref: "#/$defs/Meta"
default:
op: u
row_id: -1
description: Document metadata
title: FullRefreshResource
type: object
x-infer-schema: true
1,513 changes: 1,513 additions & 0 deletions source-iterate/poetry.lock

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions source-iterate/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[tool.poetry]
version = "0.1.0"
name = "source_iterate"
description = ""
authors = [ "Alex Bair <alexb@estuary.dev>"]

[tool.poetry.dependencies]
estuary-cdk = {path="../estuary-cdk", develop = true}
python = "^3.11"
pydantic = "^2"

[tool.poetry.group.dev.dependencies]
debugpy = "^1.8.0"
mypy = "^1.8.0"
pytest = "^7.4.3"
pytest-insta = "^0.3.0"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
64 changes: 64 additions & 0 deletions source-iterate/source_iterate/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from logging import Logger
from typing import Callable, Awaitable

from estuary_cdk.flow import (
ConnectorSpec,
)
from estuary_cdk.capture import (
BaseCaptureConnector,
Request,
Task,
common,
request,
response,
)
from estuary_cdk.http import HTTPMixin

from .resources import all_resources, validate_credentials
from .models import (
ConnectorState,
EndpointConfig,
ResourceConfig,
)


class Connector(
BaseCaptureConnector[EndpointConfig, ResourceConfig, ConnectorState],
HTTPMixin,
):
def request_class(self):
return Request[EndpointConfig, ResourceConfig, ConnectorState]

async def spec(self, log: Logger, _: request.Spec) -> ConnectorSpec:
return ConnectorSpec(
configSchema=EndpointConfig.model_json_schema(),
oauth2=None,
documentationUrl="https://go.estuary.dev/source-iterate",
resourceConfigSchema=ResourceConfig.model_json_schema(),
resourcePathPointers=ResourceConfig.PATH_POINTERS,
)

async def discover(
self, log: Logger, discover: request.Discover[EndpointConfig]
) -> response.Discovered[ResourceConfig]:
resources = await all_resources(log, self, discover.config)
return common.discovered(resources)

async def validate(
self,
log: Logger,
validate: request.Validate[EndpointConfig, ResourceConfig],
) -> response.Validated:
await validate_credentials(log, self, validate.config)
resources = await all_resources(log, self, validate.config)
resolved = common.resolve_bindings(validate.bindings, resources)
return common.validated(resolved)

async def open(
self,
log: Logger,
open: request.Open[EndpointConfig, ResourceConfig, ConnectorState],
) -> tuple[response.Opened, Callable[[Task], Awaitable[None]]]:
resources = await all_resources(log, self, open.capture.config)
resolved = common.resolve_bindings(open.capture.bindings, resources)
return common.open(open, resolved)
4 changes: 4 additions & 0 deletions source-iterate/source_iterate/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import asyncio
import source_iterate

asyncio.run(source_iterate.Connector().serve())
70 changes: 70 additions & 0 deletions source-iterate/source_iterate/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from logging import Logger
from typing import AsyncGenerator
from urllib.parse import urlparse, parse_qs

from estuary_cdk.http import HTTPSession

from .models import (
FullRefreshResource,
SurveysResponse,
SurveyResponsesResponse,
)

API = "https://iteratehq.com/api/v1"

# Iterate docs: https://iterate.docs.apiary.io/#introduction/overview
# The docs mention that every request needs a "v" query param that's the date
# of the implementation. It doesn't seem like this is required for API requests
# to succeed, but I assume it's used somehow to avoid breaking changes.
VERSION = 20250130


def _extract_next_page_cursor(url: str) -> str:
query_params = parse_qs(urlparse(url).query)

cursor = query_params.get("page[cursor]", None)
if cursor is None:
msg = f"Did not find a page[cursor] parameter in URL: {url}"
raise RuntimeError(msg)

return cursor[0]


async def snapshot_surveys(
http: HTTPSession,
log: Logger,
) -> AsyncGenerator[FullRefreshResource, None]:
url = f"{API}/surveys"
params = {"v": VERSION}

response = SurveysResponse.model_validate_json(
await http.request(log, url, params=params)
)

for survey in response.results:
yield survey


async def snapshot_survey_responses(
http: HTTPSession,
log: Logger,
) -> AsyncGenerator[FullRefreshResource, None]:
async for survey in snapshot_surveys(http, log):
survey_id = getattr(survey, "id")
url = f"{API}/surveys/{survey_id}/responses"
params: dict[str, str | int] = {"v": VERSION}

while True:
response = SurveyResponsesResponse.model_validate_json(
await http.request(log, url, params=params)
)

for record in response.results.list:
yield record

if not response.links:
break

cursor = _extract_next_page_cursor(response.links.next)

params["page[cursor]"] = cursor
53 changes: 53 additions & 0 deletions source-iterate/source_iterate/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from typing import AsyncGenerator, Callable

from estuary_cdk.capture.common import (
BaseDocument,
ResourceConfig,
ResourceState,
)
from estuary_cdk.capture.common import (
ConnectorState as GenericConnectorState,
LogCursor,
Logger,
)
from estuary_cdk.http import HTTPSession, AccessToken

from pydantic import BaseModel, Field


class EndpointConfig(BaseModel):
credentials: AccessToken = Field(
discriminator="credentials_title",
title="Authentication",
)


ConnectorState = GenericConnectorState[ResourceState]


class FullRefreshResource(BaseDocument, extra="allow"):
pass


class SurveysResponse(BaseModel, extra="forbid"):
results: list[FullRefreshResource]


class SurveyResponsesResponse(BaseModel, extra="allow"):
class Results(BaseModel, extra="forbid"):
count: int
list: list[FullRefreshResource]

results: Results

class Links(BaseModel, extra="forbid"):
next: str

# links is not present on the last page of results.
links: Links | None = None


FullRefreshFn = Callable[
[HTTPSession, Logger],
AsyncGenerator[FullRefreshResource, None],
]
Loading
Loading