How to build a custom Airbyte python source
This tutorial will demonstrate how to create a custom Airbyte python source from scratch. There is official Airbyte documentation on this, but it instructs the developer to fork their repository, generate a connector from a template using a generator and then to use the airbyte-ci tool for testing and building the Docker image. This is bad for the following reasons:
- The Airbyte repo is big and takes too long to clone
- The code is not in the repo you want
- Your connector will be publicly visible in your fork
- It is annoying to navigate to
airbyte/airbyte-integrations/connectors/<your connector>
- The connector generator breaks a lot
- The airbyte-ci tool breaks a lot
- Adds complexity by default that you may not need
The official documentation is geared towards developers that wish to contribute new connectors to be publicly available in the Airbyte platform. But it is not ideal for simpler connectors for private use cases. And it is for those cases that I decided to write this tutorial. Here, I’ll focus on the following restrictions:
- Development on a separate repo
- No airbyte-ci
- Keeping things as simple as possible.
Let’s get started.
Requirements
- python >=3.9, <3.12 (as of August 2024)
- poetry
- Docker engine
- An Airbyte OSS instance
Creating a custom python Source
Setting up the environment
The first step is going to be creating our project structure and setting up the environment.
We are going to use poetry
for packaging and dependency management. I tried doing it with pip
and venv
, but it failed to load into the Airbyte
platform. Seems to be related to the way Airbyte runs the Docker images that plays better with poetry.
Create a new directory and navigate into it:
mkdir my_source && cd my_source
Inside the directory, initialize python project with poetry init
. We need to specify the compatible python version as >=3.9,<3.12
and airbyte-cdk
as a dependency.
This can be done interactively with poetry init
or specified using flags. The resulting pyproject.toml
should
look like this:
[tool.poetry]
name = "my-source"
version = "0.1.0"
description = "Creating a custon Airbyte python source from scratch"
authors = ["André <andre.soares@altxtech.net>"]
license = "MIT"
readme = "README.md"
[tool.poetry.dependencies]
python = ">=3.9,<3.12"
airbyte-cdk = "^4.4.1"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
Write a README.md file and run poetry install
to install the required dependecies and fishish setting up our
environment. Now we are ready to start implementing the code of our connector.
Implement the AsbtractSource and Stream classes
Inside our project, we are going to create a module where we are going to implement the minium required abstract
classes to make our connector work. For a source connector, we need to implement the AbstractSource
and
Stream
classes.
Create a new module by running:
mkdir my_source && touch my_source/__init__.py && touch my_source/source.py
The project structure should look like this:
├── my_source
│ ├── __init__.py
│ ├── source.py
├── poetry.lock
├── pyproject.toml
└── README.md
The source.py
file is the one we care about and where most. This is where the connector code is going to
live.
For a barebones connector, the minimum that we need to do are the Stream
and AbstractSource
classes.
The Stream
class needs to have the primary_key
property, which identifies the key, or set of keys, that
uniquely identify the records that will be coming from that stream. It should return a single string
or an iterable of strings. The other thing we need is the read_records()
method. That is
what effectively returns (or yields) the record messages emitted by the stream.
Optionally, you can also implemente the __init__()
method. This is useful to pass our connector configuration
into the Stream.
The AbstractSource
class requires the implementation of two methods. The first one is the check_connection()
method, which will be called by the check
command and returns whether the provided configuration is correct
and can successfully connect to the data source. It can also return an optional log message.
Finally, we need to implement the streams()
method. It is going to be called by the discover
command, and
tells Airbyte which streams are available from this source. It should return an iterable of instances of
concrete Stream
classes.
Here’s a very minimal source.py
file:
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
class MyStream(Stream):
def __init__(self, config):
self.config = config
@property
def primary_key(self):
return "id"
def read_records(self, sync_mode, cursor_field, stream_slice, stream_state):
# TODO: Extract data
# Return records as dict
return [{"id":1, "foo":"bar"}, {"id":2, "foo":"baz"}]
class MySource(AbstractSource):
def check_connection(self, logger, config):
# TODO: Check if connection works
return True, None # Optinal, log message
def streams(self, config):
return[MyStream(config)]
Now that we have a valid implementation, we are going to try to launch it.
Launching the Source
To launch, we are going to write a run()
function and a main.py
module to call that function. With the Airbyte CDK,
we can launch a connector by using the launch
function which takes two arguments: An instance of a concrete source
and an iterable of strings, which represents the command line arguments passed to the source.
Create both our files with touch my_source/run.py && touch main.py
. This is how both files should be implemented:
run.py
import sys
from airbyte_cdk.entrypoint import launch
from .source import MySource
def run():
launch(MySource(), sys.argv[1:])
main.py
from my_source.run import run
if __name__ == "__main__":
run()
The project structure should be:
├── main.py
├── my_source
│ ├── __init__.py
│ ├── run.py
│ ├── source.py
├── poetry.lock
├── pyproject.toml
└── README.md
To launch the source, run poetry run python main.py --help
. If everything is correct, you should see
the help message:
usage: main.py [-h] {spec,check,discover,read} ...
options:
-h, --help show this help message and exit
commands:
{spec,check,discover,read}
spec outputs the json configuration specification
check checks the config can be used to connect
discover outputs a catalog describing the source's schema
read reads the source and outputs messages to STDOUT
As we can see, when the source is launched, it should be able to handle the spec
, check
, discover
and read
commands. These are the commands defined by the
Airbyte Protocol Docker Interface
for source connectors.
These are not working yet, because we have to write some metadata files. You can check that by running
poetry run python main.py spec
. It should output some error logs.
Naturally, the next step is making those commands work.
Making the commands work
The ‘spec’ command - connector specification
The spec
command is the first one in our list and also the simplest one to fix.
This command exists to tell Airbyte which arguments are needed to configure the source.
By default, it looks for the spec.json
inside the connector module. This file conforms to an
Actor Specification
and the only required field is connectionSpecificaton
, which is a
JSON Schema
object that describes what arguments are needed to configure the source and which fields are required.
Create it with touch my_source/spec.json
. A very basic spec.json
with a single required property looks
as follows:
{
"connectionSpecification": {
"title": "Example Source",
"type": "object",
"required": ["some_property"],
"additionalProperties": false,
"properties": {
"some_property": {
"type": "string",
"description": "Some data"
}
}
}
}
Now, poetry run python main.py spec
should work and output a SPEC message:
{"type":"SPEC","spec":{"connectionSpecification":{"title":"Example Source","type":"object","required":["some_property"],"additionalProperties":false,"properties":{"some_property":{"type":"string","description":"Some data"}}}}}
Now, Airbyte can understand what configuration schema is expected by our source. The next step is to check if a
provided configuration is valid or not. This done by the check
command, which is our next step.
The ‘check’ command - valid configuration
The check
command exists to verify if a given configuration is valid. Airbyte will do some basic checks for us.
For example, if you specify that your source needs to be provided an API Key as a string, Airbyte will fail the check if a configuration does not have the API Key.
The check
command also calls the AbstractSource.check_connection()
under the hood.
This is where you put any other checks you need. For example, if an API Key is invalid or expired.
At this point, the check
command should be working. The only thing we need are some test configurations
to files to test it. Create an integration_tests
directory to store these configurations:
mkdir integration_tests
. Then, write the following:
integration_tests/config.json
{"some_property": "hello!"}
integration_tests/invalid_config.json
{}
Test both configurations. poetry run python main.py check --config integration_tests/config.json
should succeed:
{"type":"LOG","log":{"level":"INFO","message":"Check succeeded"}}
{"type":"CONNECTION_STATUS","connectionStatus":{"status":"SUCCEEDED"}}
poetry run python main.py check --config integration_tests/invalid_config.json
should fail because some_property
is
missing:
{"type":"CONNECTION_STATUS","connectionStatus":{"status":"FAILED","message":"Config validation error: 'some_property' is a required property"}}
The ‘discover’ command - defining the stream schema
The discover
exists so Airbyte knows which streams can be extracted from the source. This command will need to return the
schema of each stream.
By default, it searches for a Json file in the schemas directory inside the source’s module that match the name of the stream.
First, create the schemas folder with mkdir my_source/schemas
. This is where the schema for all your streams should go.
Then write the schema definition for MyStream
:
my_source/schemas/my_stream.json
{
"title": "My Stream",
"description": "Sample stream",
"type": "object",
"properties": {
"id": {
"type": "integer",
"description": "Primary key"
},
"foo": {
"type": "string",
"description": "Some content"
}
}
}
Now, run:
poetry run python main.py discover --config integration_tests/config.json
Should output:
{"type":"CATALOG","catalog":{"streams":[{"name":"my_stream","json_schema":{"title":"My Stream","description":"Sample stream","type":"object","properties":{"id":{"type":"integer","description":"Primary key"},"foo":{"type":"string","description":"Some content"}}},"supported_sync_modes":["full_refresh"],"source_defined_primary_key":[["id"]],"is_resumable":false}]}}
So this means that so far we have three out of four commands working. The last one we need to handle is read
.
The ‘read’ command - reading data
The read
command is the one that actually extracts the data. It uses the Stream.read_records()
behind the scenes.
Besides the source configuration, the other argument this command needs a configured catalog, which specifies how each stream should be extracted. We just need to write a simple configured catalog file for testing.
integration_tests/configured_catalog.json
{
"streams": [
{
"stream": {
"name": "my_stream",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
And now test the source with
poetry run python main.py read \
--config integration_tests/config.json \
--catalog integration_tests/configured_catalog.json
It should output the hardcoded records we set earlier in the tutorial, along with some logs:
{"type":"LOG","log":{"level":"INFO","message":"Starting syncing MySource"}}
{"type":"LOG","log":{"level":"INFO","message":"Marking stream my_stream as STARTED"}}
{"type":"TRACE","trace":{"type":"STREAM_STATUS","emitted_at":1723754610183.407,"stream_status":{"stream_descriptor":{"name":"my_stream","namespace":null},"status":"STARTED","reasons":null}}}
{"type":"LOG","log":{"level":"INFO","message":"Syncing stream: my_stream "}}
{"type":"LOG","log":{"level":"INFO","message":"Marking stream my_stream as RUNNING"}}
{"type":"TRACE","trace":{"type":"STREAM_STATUS","emitted_at":1723754610183.815,"stream_status":{"stream_descriptor":{"name":"my_stream","namespace":null},"status":"RUNNING","reasons":null}}}
{"type":"RECORD","record":{"stream":"my_stream","data":{"id":1,"foo":"bar"},"emitted_at":1723754610183}}
{"type":"RECORD","record":{"stream":"my_stream","data":{"id":2,"foo":"baz"},"emitted_at":1723754610183}}
{"type":"STATE","state":{"type":"STREAM","stream":{"stream_descriptor":{"name":"my_stream","namespace":null},"stream_state":{"__ab_no_cursor_state_message":true}},"sourceStats":{"recordCount":2.0}}}
{"type":"LOG","log":{"level":"INFO","message":"Read 2 records from my_stream stream"}}
{"type":"LOG","log":{"level":"INFO","message":"Marking stream my_stream as STOPPED"}}
{"type":"TRACE","trace":{"type":"STREAM_STATUS","emitted_at":1723754610184.083,"stream_status":{"stream_descriptor":{"name":"my_stream","namespace":null},"status":"COMPLETE","reasons":null}}}
{"type":"LOG","log":{"level":"INFO","message":"Finished syncing my_stream"}}
{"type":"LOG","log":{"level":"INFO","message":"MySource runtimes:\nSyncing stream my_stream 0:00:00.000753"}}
{"type":"LOG","log":{"level":"INFO","message":"Finished syncing MySource"}}
If you got to this point, then all commands of the source interface are working properly, and we are done with the implementation in python. But we are not finished yet, because we still need to Dockerize our connector, so Airbyte can use it.
Dockerize the Source
The simplest way to create our Docker image is to use the airbyte/python-connector-base
base image. Create the
following Dockerfile at the root of your project.
FROM airbyte/python-connector-base:2.0.0
COPY . ./airbyte/integration_code
RUN pip install ./airbyte/integration_code
# The entrypoint and default env vars are already set in the base image
ENV AIRBYTE_ENTRYPOINT="python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
The final project structure should be:
├── Dockerfile
├── integration_tests
│ ├── config.json
│ ├── configured_catalog.json
│ └── invalid_config.json
├── main.py
├── my_source
│ ├── __init__.py
│ ├── run.py
│ ├── schemas
│ │ └── my_stream.json
│ ├── source.py
│ └── spec.json
├── poetry.lock
├── pyproject.toml
└── README.md
Build your image with docker build -t <repo name>/<image name>:tag
. For example
docker build -t airbyte-custom/my-source:dev .
The next and final step is to load the image into Airbyte. The image should be locally present in the server you are running your instance of Airbyte OSS. To test your image you can either:
- Run Airbyte locally
- Push the image to a remote Docker repository, and then pull it from the server your Airbyte instance is running
After this, navigate in your browser to the Airbyte WebApp for the final step in this tutorial.
Load the source in Airbyte
On the Airbyte webapp, navigate to Setting > Sources. Then click on `+ New connector and add a new Docker connector. Fill out the form as such:
If it works, you be redirected to a page to configure you custom source. From there onward you’ll be able to use your custom source like any other source from that Airbyte instance, including configuring connections to destinations.
Conclusions and next steps
From this point, you have an end-to-end setup for iterating on developing a custom connector and seeing the results in an Airbyte instance. For this guide, I kept things very simple so we could reach this point as quickly as possible. Because of that, I left out a few things that you might explore.
The first one is using proper typing. The airbyte CDK actually has a lot predefined classes that you can use. You can learn about that by exploring their github.
You also should take a look at testing. Maybe you noticed how we created and integration_tests
directory
but no actual tests. In this guide, all tests were done manually via command line. This is good enough to start, but
as you iterate on your development, testing it this way is going to get annoying very fast. So I would strongly
recommend automating testing with pytest.
And I guess that’s all for this guide. Possibly I’ll also write one for developing sources. But for now, I wish you good look developing your source.