Skip to main content
Catalogs provide metadata for databases, tables, views, partitions, and functions, and the information needed to access data stored in external systems. By abstracting metadata management behind a unified API, catalogs let Table API programs and SQL queries reference objects by name without hard-coding connection details in every query. Flink always starts with a default in-memory catalog named default_catalog and a default database named default_database. Any objects you create without specifying a catalog are placed there.

Catalog types

GenericInMemoryCatalog

GenericInMemoryCatalog is the default catalog. All objects live only for the duration of the session. It is case-sensitive, unlike Hive Metastore.

JdbcCatalog

JdbcCatalog connects Flink to a relational database over JDBC. Postgres and MySQL are the two supported implementations. Tables in the database are automatically mapped to Flink tables—no manual DDL required.

HiveCatalog

HiveCatalog serves two purposes:
  1. Persistent storage for Flink metadata (tables, views, UDFs) that survives session restarts.
  2. Hive integration: reading and writing existing Hive metadata so Flink queries can access Hive tables transparently.
The Hive Metastore stores all object names in lowercase. This differs from GenericInMemoryCatalog, which is case-sensitive.

User-defined catalogs

You can implement custom catalogs by implementing the Catalog interface and a companion CatalogFactory. The factory is discovered via Java SPI: register the fully-qualified class name in META-INF/services/org.apache.flink.table.factories.Factory. The type identifier in the factory must match the type property in CREATE CATALOG DDL.
Since Flink 1.16, user-defined catalogs should load classes through CatalogFactory.Context#getClassLoader() rather than Thread.currentThread().getContextClassLoader() to avoid ClassNotFoundException.

Registering a catalog

import org.apache.flink.table.api.*;
import org.apache.flink.table.catalog.hive.HiveCatalog;

TableEnvironment tableEnv = TableEnvironment.create(
    EnvironmentSettings.inStreamingMode()
);

// create and register a HiveCatalog
HiveCatalog catalog = new HiveCatalog(
    "myhive",          // catalog name
    null,              // default database (uses "default")
    "/path/to/hive-conf"
);
tableEnv.registerCatalog("myhive", catalog);

// switch to the new catalog
tableEnv.useCatalog("myhive");
tableEnv.useDatabase("default");

Creating tables in a catalog

// must be using the target catalog first
tableEnv.useCatalog("myhive");
tableEnv.useDatabase("mydb");

// create the database
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS mydb");

// create a Kafka-backed table
tableEnv.executeSql(
    "CREATE TABLE user_events (" +
    "  user_id BIGINT," +
    "  action STRING," +
    "  event_time TIMESTAMP(3)," +
    "  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'user-events'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'format' = 'json'" +
    ")"
);

System.out.println(java.util.Arrays.toString(tableEnv.listTables()));

Catalog API reference

Database operations

// create
catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), false);

// drop
catalog.dropDatabase("mydb", false);

// get metadata
CatalogDatabase db = catalog.getDatabase("mydb");

// check existence
boolean exists = catalog.databaseExists("mydb");

// list all databases
List<String> dbs = catalog.listDatabases();

Table operations

// create
catalog.createTable(
    new ObjectPath("mydb", "mytable"),
    CatalogTable.newBuilder()
        .schema(schema)
        .options(options)
        .build(),
    false
);

// drop
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);

// rename
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table", false);

// check existence
boolean exists = catalog.tableExists(new ObjectPath("mydb", "mytable"));

// list
List<String> tables = catalog.listTables("mydb");

Function operations

import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.FunctionLanguage;

// register a Java UDF in the catalog
catalog.createFunction(
    new ObjectPath("mydb", "my_udf"),
    new CatalogFunctionImpl("com.example.MyScalarFunction", FunctionLanguage.JAVA),
    false
);

// list functions
List<String> functions = catalog.listFunctions("mydb");

Switching catalogs and databases

Flink resolves table names relative to the current catalog and database. You can always use a fully-qualified three-part name to reference objects in any catalog:
// switch context
tableEnv.useCatalog("myhive");
tableEnv.useDatabase("analytics");

// fully-qualified reference to a table in another catalog
tableEnv.from("other_catalog.other_db.some_table");

// list
String[] catalogs = tableEnv.listCatalogs();
String[] databases = tableEnv.listDatabases();
String[] tables = tableEnv.listTables();

Catalog Store: persisting catalog configurations

A CatalogStore saves catalog configurations so they can be restored when a session restarts. Flink ships two built-in implementations:
  • GenericInMemoryCatalogStore (default): configurations are in-memory only.
  • FileCatalogStore: configurations are written to a directory on a local or remote filesystem, one file per catalog.
import org.apache.flink.table.catalog.FileCatalogStore;

CatalogStore catalogStore = new FileCatalogStore("file:///var/flink/catalog-store");

EnvironmentSettings settings = EnvironmentSettings.newInstance()
    .inBatchMode()
    .withCatalogStore(catalogStore)
    .build();

TableEnvironment tableEnv = TableEnvironment.create(settings);
Or configure via conf/config.yaml / SQL Gateway:
table.catalog-store.kind: file
table.catalog-store.file.path: file:///var/flink/catalog-store

Custom catalog implementation

Implement Catalog and CatalogFactory to integrate Flink with a proprietary metadata store. Register the factory in META-INF/services/org.apache.flink.table.factories.Factory inside your JAR, then reference it in DDL:
CREATE CATALOG my_custom_catalog WITH (
    'type' = 'my-catalog-type',
    'endpoint' = 'https://metadata.example.com'
);
The factory factoryIdentifier() must return 'my-catalog-type'.

Supporting time travel

If your catalog stores historical versions of tables, implement getTable(ObjectPath, long timestamp) to enable the FOR SYSTEM_TIME AS OF syntax:
public class MyVersionedCatalog implements Catalog {

    @Override
    public CatalogBaseTable getTable(ObjectPath tablePath, long timestamp)
            throws TableNotExistException {
        Schema schema = buildSchemaAt(timestamp);
        Map<String, String> options = buildOptionsAt(timestamp);
        return CatalogTable.newBuilder()
                .schema(schema)
                .options(options)
                .snapshot(timestamp)
                .build();
    }
}
Users can then query historical data:
SELECT * FROM my_table FOR SYSTEM_TIME AS OF TIMESTAMP '2024-01-01 00:00:00';

Build docs developers (and LLMs) love