How to build a custom Airbyte python source

cover

This tutorial will demonstrate how to create a custom Airbyte python source from scratch. There is official Airbyte documentation on this, but it instructs the developer to fork their repository, generate a connector from a template using a generator and then to use the airbyte-ci tool for testing and building the Docker image. This is bad for the following reasons:

  1. The Airbyte repo is big and takes too long to clone
  2. The code is not in the repo you want
  3. Your connector will be publicly visible in your fork
  4. It is annoying to navigate to airbyte/airbyte-integrations/connectors/<your connector>
  5. The connector generator breaks a lot
  6. The airbyte-ci tool breaks a lot
  7. Adds complexity by default that you may not need

The official documentation is geared towards developers that wish to contribute new connectors to be publicly available in the Airbyte platform. But it is not ideal for simpler connectors for private use cases. And it is for those cases that I decided to write this tutorial. Here, I’ll focus on the following restrictions:

  1. Development on a separate repo
  2. No airbyte-ci
  3. Keeping things as simple as possible.

Let’s get started.

Requirements

  • python >=3.9, <3.12 (as of August 2024)
  • poetry
  • Docker engine
  • An Airbyte OSS instance

Creating a custom python Source

Setting up the environment

The first step is going to be creating our project structure and setting up the environment.

We are going to use poetry for packaging and dependency management. I tried doing it with pip and venv, but it failed to load into the Airbyte platform. Seems to be related to the way Airbyte runs the Docker images that plays better with poetry.

Create a new directory and navigate into it:

mkdir my_source && cd my_source

Inside the directory, initialize python project with poetry init. We need to specify the compatible python version as >=3.9,<3.12 and airbyte-cdk as a dependency. This can be done interactively with poetry init or specified using flags. The resulting pyproject.toml should look like this:

[tool.poetry]
name = "my-source"
version = "0.1.0"
description = "Creating a custon Airbyte python source from scratch"
authors = ["André <andre.soares@altxtech.net>"]
license = "MIT"
readme = "README.md"

[tool.poetry.dependencies]
python = ">=3.9,<3.12"
airbyte-cdk = "^4.4.1"


[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

Write a README.md file and run poetry install to install the required dependecies and fishish setting up our environment. Now we are ready to start implementing the code of our connector.

Implement the AsbtractSource and Stream classes

Inside our project, we are going to create a module where we are going to implement the minium required abstract classes to make our connector work. For a source connector, we need to implement the AbstractSourceand Stream classes.

Create a new module by running:

mkdir my_source && touch my_source/__init__.py &&  touch my_source/source.py

The project structure should look like this:

├── my_source
│   ├── __init__.py
│   ├── source.py
├── poetry.lock
├── pyproject.toml
└── README.md

The source.py file is the one we care about and where most. This is where the connector code is going to live.

For a barebones connector, the minimum that we need to do are the Stream and AbstractSource classes.

The Streamclass needs to have the primary_key property, which identifies the key, or set of keys, that uniquely identify the records that will be coming from that stream. It should return a single string or an iterable of strings. The other thing we need is the read_records() method. That is what effectively returns (or yields) the record messages emitted by the stream.

Optionally, you can also implemente the __init__() method. This is useful to pass our connector configuration into the Stream.

The AbstractSource class requires the implementation of two methods. The first one is the check_connection() method, which will be called by the check command and returns whether the provided configuration is correct and can successfully connect to the data source. It can also return an optional log message.

Finally, we need to implement the streams() method. It is going to be called by the discover command, and tells Airbyte which streams are available from this source. It should return an iterable of instances of concrete Stream classes.

Here’s a very minimal source.py file:

from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream

class MyStream(Stream):

    def __init__(self, config):
        self.config = config

    @property
    def primary_key(self):
        return "id"


    def read_records(self, sync_mode, cursor_field, stream_slice, stream_state):

        # TODO: Extract data

        # Return records as dict
        return [{"id":1, "foo":"bar"}, {"id":2, "foo":"baz"}]

class MySource(AbstractSource):

    def check_connection(self, logger, config):

        # TODO: Check if connection works

        return True, None # Optinal, log message

    def streams(self, config):

        return[MyStream(config)]

Now that we have a valid implementation, we are going to try to launch it.

Launching the Source

To launch, we are going to write a run() function and a main.py module to call that function. With the Airbyte CDK, we can launch a connector by using the launch function which takes two arguments: An instance of a concrete source and an iterable of strings, which represents the command line arguments passed to the source.

Create both our files with touch my_source/run.py && touch main.py. This is how both files should be implemented:

run.py

import sys
from airbyte_cdk.entrypoint import launch
from .source import MySource

def run():
    launch(MySource(), sys.argv[1:])

main.py

from my_source.run import run

if __name__ == "__main__":
    run()

The project structure should be:

├── main.py
├── my_source
│   ├── __init__.py
│   ├── run.py
│   ├── source.py
├── poetry.lock
├── pyproject.toml
└── README.md

To launch the source, run poetry run python main.py --help. If everything is correct, you should see the help message:

usage: main.py [-h] {spec,check,discover,read} ...

options:
  -h, --help            show this help message and exit

commands:
  {spec,check,discover,read}
    spec                outputs the json configuration specification
    check               checks the config can be used to connect
    discover            outputs a catalog describing the source's schema
    read                reads the source and outputs messages to STDOUT

As we can see, when the source is launched, it should be able to handle the spec, check, discover and read commands. These are the commands defined by the Airbyte Protocol Docker Interface for source connectors.

These are not working yet, because we have to write some metadata files. You can check that by running poetry run python main.py spec. It should output some error logs.

Naturally, the next step is making those commands work.

Making the commands work

The ‘spec’ command - connector specification

The spec command is the first one in our list and also the simplest one to fix.

This command exists to tell Airbyte which arguments are needed to configure the source. By default, it looks for the spec.json inside the connector module. This file conforms to an Actor Specification and the only required field is connectionSpecificaton, which is a JSON Schema object that describes what arguments are needed to configure the source and which fields are required.

Create it with touch my_source/spec.json. A very basic spec.json with a single required property looks as follows:

{
  "connectionSpecification": {
    "title": "Example Source",
    "type": "object",
    "required": ["some_property"],
    "additionalProperties": false,
    "properties": {
      "some_property": {
        "type": "string",
        "description": "Some data"
      }
    }
  }
}

Now, poetry run python main.py spec should work and output a SPEC message:

{"type":"SPEC","spec":{"connectionSpecification":{"title":"Example Source","type":"object","required":["some_property"],"additionalProperties":false,"properties":{"some_property":{"type":"string","description":"Some data"}}}}}

Now, Airbyte can understand what configuration schema is expected by our source. The next step is to check if a provided configuration is valid or not. This done by the check command, which is our next step.

The ‘check’ command - valid configuration

The check command exists to verify if a given configuration is valid. Airbyte will do some basic checks for us.

For example, if you specify that your source needs to be provided an API Key as a string, Airbyte will fail the check if a configuration does not have the API Key.

The check command also calls the AbstractSource.check_connection() under the hood. This is where you put any other checks you need. For example, if an API Key is invalid or expired.

At this point, the check command should be working. The only thing we need are some test configurations to files to test it. Create an integration_tests directory to store these configurations: mkdir integration_tests. Then, write the following:

integration_tests/config.json

{"some_property": "hello!"}

integration_tests/invalid_config.json

{}

Test both configurations. poetry run python main.py check --config integration_tests/config.json should succeed:

{"type":"LOG","log":{"level":"INFO","message":"Check succeeded"}}
{"type":"CONNECTION_STATUS","connectionStatus":{"status":"SUCCEEDED"}}

poetry run python main.py check --config integration_tests/invalid_config.json should fail because some_property is missing:

{"type":"CONNECTION_STATUS","connectionStatus":{"status":"FAILED","message":"Config validation error: 'some_property' is a required property"}}

The ‘discover’ command - defining the stream schema

The discover exists so Airbyte knows which streams can be extracted from the source. This command will need to return the schema of each stream.

By default, it searches for a Json file in the schemas directory inside the source’s module that match the name of the stream.

First, create the schemas folder with mkdir my_source/schemas. This is where the schema for all your streams should go. Then write the schema definition for MyStream:

my_source/schemas/my_stream.json

{
	"title": "My Stream",
	"description": "Sample stream",
	"type": "object",
	"properties": {
		"id": {
			"type": "integer",
			"description": "Primary key"
		},
		"foo": {
			"type": "string",
			"description": "Some content"
		}
	}
}

Now, run:

poetry run python main.py discover --config integration_tests/config.json

Should output:

{"type":"CATALOG","catalog":{"streams":[{"name":"my_stream","json_schema":{"title":"My Stream","description":"Sample stream","type":"object","properties":{"id":{"type":"integer","description":"Primary key"},"foo":{"type":"string","description":"Some content"}}},"supported_sync_modes":["full_refresh"],"source_defined_primary_key":[["id"]],"is_resumable":false}]}}

So this means that so far we have three out of four commands working. The last one we need to handle is read.

The ‘read’ command - reading data

The read command is the one that actually extracts the data. It uses the Stream.read_records() behind the scenes.

Besides the source configuration, the other argument this command needs a configured catalog, which specifies how each stream should be extracted. We just need to write a simple configured catalog file for testing.

integration_tests/configured_catalog.json

{
	"streams": [
		{
			"stream": {
				"name": "my_stream",
				"json_schema": {},
				"supported_sync_modes": ["full_refresh"]
			},
			"sync_mode": "full_refresh",
			"destination_sync_mode": "overwrite"
		}
	]
}

And now test the source with

poetry run python main.py read \
    --config integration_tests/config.json \
    --catalog integration_tests/configured_catalog.json

It should output the hardcoded records we set earlier in the tutorial, along with some logs:

{"type":"LOG","log":{"level":"INFO","message":"Starting syncing MySource"}}
{"type":"LOG","log":{"level":"INFO","message":"Marking stream my_stream as STARTED"}}
{"type":"TRACE","trace":{"type":"STREAM_STATUS","emitted_at":1723754610183.407,"stream_status":{"stream_descriptor":{"name":"my_stream","namespace":null},"status":"STARTED","reasons":null}}}
{"type":"LOG","log":{"level":"INFO","message":"Syncing stream: my_stream "}}
{"type":"LOG","log":{"level":"INFO","message":"Marking stream my_stream as RUNNING"}}
{"type":"TRACE","trace":{"type":"STREAM_STATUS","emitted_at":1723754610183.815,"stream_status":{"stream_descriptor":{"name":"my_stream","namespace":null},"status":"RUNNING","reasons":null}}}
{"type":"RECORD","record":{"stream":"my_stream","data":{"id":1,"foo":"bar"},"emitted_at":1723754610183}}
{"type":"RECORD","record":{"stream":"my_stream","data":{"id":2,"foo":"baz"},"emitted_at":1723754610183}}
{"type":"STATE","state":{"type":"STREAM","stream":{"stream_descriptor":{"name":"my_stream","namespace":null},"stream_state":{"__ab_no_cursor_state_message":true}},"sourceStats":{"recordCount":2.0}}}
{"type":"LOG","log":{"level":"INFO","message":"Read 2 records from my_stream stream"}}
{"type":"LOG","log":{"level":"INFO","message":"Marking stream my_stream as STOPPED"}}
{"type":"TRACE","trace":{"type":"STREAM_STATUS","emitted_at":1723754610184.083,"stream_status":{"stream_descriptor":{"name":"my_stream","namespace":null},"status":"COMPLETE","reasons":null}}}
{"type":"LOG","log":{"level":"INFO","message":"Finished syncing my_stream"}}
{"type":"LOG","log":{"level":"INFO","message":"MySource runtimes:\nSyncing stream my_stream 0:00:00.000753"}}
{"type":"LOG","log":{"level":"INFO","message":"Finished syncing MySource"}}

If you got to this point, then all commands of the source interface are working properly, and we are done with the implementation in python. But we are not finished yet, because we still need to Dockerize our connector, so Airbyte can use it.

Dockerize the Source

The simplest way to create our Docker image is to use the airbyte/python-connector-base base image. Create the following Dockerfile at the root of your project.

FROM airbyte/python-connector-base:2.0.0

COPY . ./airbyte/integration_code
RUN pip install ./airbyte/integration_code

# The entrypoint and default env vars are already set in the base image
ENV AIRBYTE_ENTRYPOINT="python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

The final project structure should be:

├── Dockerfile
├── integration_tests
│   ├── config.json
│   ├── configured_catalog.json
│   └── invalid_config.json
├── main.py
├── my_source
│   ├── __init__.py
│   ├── run.py
│   ├── schemas
│   │   └── my_stream.json
│   ├── source.py
│   └── spec.json
├── poetry.lock
├── pyproject.toml
└── README.md

Build your image with docker build -t <repo name>/<image name>:tag. For example docker build -t airbyte-custom/my-source:dev .

The next and final step is to load the image into Airbyte. The image should be locally present in the server you are running your instance of Airbyte OSS. To test your image you can either:

  • Run Airbyte locally
  • Push the image to a remote Docker repository, and then pull it from the server your Airbyte instance is running

After this, navigate in your browser to the Airbyte WebApp for the final step in this tutorial.

Load the source in Airbyte

On the Airbyte webapp, navigate to Setting > Sources. Then click on `+ New connector and add a new Docker connector. Fill out the form as such:

Airbyte New Docker Connector form

If it works, you be redirected to a page to configure you custom source. From there onward you’ll be able to use your custom source like any other source from that Airbyte instance, including configuring connections to destinations.

Conclusions and next steps

From this point, you have an end-to-end setup for iterating on developing a custom connector and seeing the results in an Airbyte instance. For this guide, I kept things very simple so we could reach this point as quickly as possible. Because of that, I left out a few things that you might explore.

The first one is using proper typing. The airbyte CDK actually has a lot predefined classes that you can use. You can learn about that by exploring their github.

You also should take a look at testing. Maybe you noticed how we created and integration_tests directory but no actual tests. In this guide, all tests were done manually via command line. This is good enough to start, but as you iterate on your development, testing it this way is going to get annoying very fast. So I would strongly recommend automating testing with pytest.

And I guess that’s all for this guide. Possibly I’ll also write one for developing sources. But for now, I wish you good look developing your source.