Catalogs provide metadata for databases, tables, views, partitions, and functions, and the information needed to access data stored in external systems. By abstracting metadata management behind a unified API, catalogs let Table API programs and SQL queries reference objects by name without hard-coding connection details in every query.
Flink always starts with a default in-memory catalog named default_catalog and a default database named default_database. Any objects you create without specifying a catalog are placed there.
Catalog types
GenericInMemoryCatalog
GenericInMemoryCatalog is the default catalog. All objects live only for the duration of the session. It is case-sensitive, unlike Hive Metastore.
JdbcCatalog
JdbcCatalog connects Flink to a relational database over JDBC. Postgres and MySQL are the two supported implementations. Tables in the database are automatically mapped to Flink tables—no manual DDL required.
HiveCatalog
HiveCatalog serves two purposes:
- Persistent storage for Flink metadata (tables, views, UDFs) that survives session restarts.
- Hive integration: reading and writing existing Hive metadata so Flink queries can access Hive tables transparently.
The Hive Metastore stores all object names in lowercase. This differs from GenericInMemoryCatalog, which is case-sensitive.
User-defined catalogs
You can implement custom catalogs by implementing the Catalog interface and a companion CatalogFactory. The factory is discovered via Java SPI: register the fully-qualified class name in META-INF/services/org.apache.flink.table.factories.Factory. The type identifier in the factory must match the type property in CREATE CATALOG DDL.
Since Flink 1.16, user-defined catalogs should load classes through CatalogFactory.Context#getClassLoader() rather than Thread.currentThread().getContextClassLoader() to avoid ClassNotFoundException.
Registering a catalog
import org.apache.flink.table.api.*;
import org.apache.flink.table.catalog.hive.HiveCatalog;
TableEnvironment tableEnv = TableEnvironment.create(
EnvironmentSettings.inStreamingMode()
);
// create and register a HiveCatalog
HiveCatalog catalog = new HiveCatalog(
"myhive", // catalog name
null, // default database (uses "default")
"/path/to/hive-conf"
);
tableEnv.registerCatalog("myhive", catalog);
// switch to the new catalog
tableEnv.useCatalog("myhive");
tableEnv.useDatabase("default");
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.catalog import HiveCatalog
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
catalog = HiveCatalog("myhive", None, "/path/to/hive-conf")
t_env.register_catalog("myhive", catalog)
t_env.use_catalog("myhive")
t_env.use_database("default")
CREATE CATALOG myhive WITH (
'type' = 'hive',
'hive-conf-dir' = '/path/to/hive-conf'
);
USE CATALOG myhive;
USE default;
Creating tables in a catalog
Java (DDL)
Java (API)
Python
// must be using the target catalog first
tableEnv.useCatalog("myhive");
tableEnv.useDatabase("mydb");
// create the database
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS mydb");
// create a Kafka-backed table
tableEnv.executeSql(
"CREATE TABLE user_events (" +
" user_id BIGINT," +
" action STRING," +
" event_time TIMESTAMP(3)," +
" WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'user-events'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")"
);
System.out.println(java.util.Arrays.toString(tableEnv.listTables()));
import org.apache.flink.table.api.*;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.catalog.hive.HiveCatalog;
TableEnvironment tableEnv = TableEnvironment.create(
EnvironmentSettings.inStreamingMode()
);
HiveCatalog catalog = new HiveCatalog("myhive", null, "/path/to/hive-conf");
tableEnv.registerCatalog("myhive", catalog);
// create a database programmatically
catalog.createDatabase("mydb", new CatalogDatabaseImpl(new java.util.HashMap<>(), ""), false);
// define schema
Schema schema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build();
// create a catalog table using a TableDescriptor
tableEnv.createTable(
"myhive.mydb.mytable",
TableDescriptor.forConnector("kafka")
.schema(schema)
.option("topic", "my-topic")
.option("properties.bootstrap.servers", "localhost:9092")
.option("format", "json")
.build()
);
// list tables in the database
catalog.listTables("mydb").forEach(System.out::println);
from pyflink.table import *
from pyflink.table.catalog import HiveCatalog, CatalogDatabase
t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
catalog = HiveCatalog("myhive", None, "/path/to/hive-conf")
t_env.register_catalog("myhive", catalog)
# create a database
database = CatalogDatabase.create_instance({"k1": "v1"}, None)
catalog.create_database("mydb", database)
# create a table
schema = Schema.new_builder() \
.column("name", DataTypes.STRING()) \
.column("age", DataTypes.INT()) \
.build()
t_env.create_table(
"myhive.mydb.mytable",
TableDescriptor.for_connector("kafka")
.schema(schema)
.option("topic", "my-topic")
.option("properties.bootstrap.servers", "localhost:9092")
.option("format", "json")
.build()
)
Catalog API reference
Database operations
// create
catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), false);
// drop
catalog.dropDatabase("mydb", false);
// get metadata
CatalogDatabase db = catalog.getDatabase("mydb");
// check existence
boolean exists = catalog.databaseExists("mydb");
// list all databases
List<String> dbs = catalog.listDatabases();
Table operations
// create
catalog.createTable(
new ObjectPath("mydb", "mytable"),
CatalogTable.newBuilder()
.schema(schema)
.options(options)
.build(),
false
);
// drop
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
// rename
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table", false);
// check existence
boolean exists = catalog.tableExists(new ObjectPath("mydb", "mytable"));
// list
List<String> tables = catalog.listTables("mydb");
Function operations
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.FunctionLanguage;
// register a Java UDF in the catalog
catalog.createFunction(
new ObjectPath("mydb", "my_udf"),
new CatalogFunctionImpl("com.example.MyScalarFunction", FunctionLanguage.JAVA),
false
);
// list functions
List<String> functions = catalog.listFunctions("mydb");
Switching catalogs and databases
Flink resolves table names relative to the current catalog and database. You can always use a fully-qualified three-part name to reference objects in any catalog:
// switch context
tableEnv.useCatalog("myhive");
tableEnv.useDatabase("analytics");
// fully-qualified reference to a table in another catalog
tableEnv.from("other_catalog.other_db.some_table");
// list
String[] catalogs = tableEnv.listCatalogs();
String[] databases = tableEnv.listDatabases();
String[] tables = tableEnv.listTables();
USE CATALOG myhive;
USE analytics;
SHOW CATALOGS;
SHOW DATABASES;
SHOW TABLES;
-- fully-qualified query
SELECT * FROM other_catalog.other_db.some_table;
Catalog Store: persisting catalog configurations
A CatalogStore saves catalog configurations so they can be restored when a session restarts. Flink ships two built-in implementations:
GenericInMemoryCatalogStore (default): configurations are in-memory only.
FileCatalogStore: configurations are written to a directory on a local or remote filesystem, one file per catalog.
import org.apache.flink.table.catalog.FileCatalogStore;
CatalogStore catalogStore = new FileCatalogStore("file:///var/flink/catalog-store");
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inBatchMode()
.withCatalogStore(catalogStore)
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
Or configure via conf/config.yaml / SQL Gateway:
table.catalog-store.kind: file
table.catalog-store.file.path: file:///var/flink/catalog-store
Custom catalog implementation
Implement Catalog and CatalogFactory to integrate Flink with a proprietary metadata store.
Register the factory in META-INF/services/org.apache.flink.table.factories.Factory inside your JAR, then reference it in DDL:
CREATE CATALOG my_custom_catalog WITH (
'type' = 'my-catalog-type',
'endpoint' = 'https://metadata.example.com'
);
The factory factoryIdentifier() must return 'my-catalog-type'.
Supporting time travel
If your catalog stores historical versions of tables, implement getTable(ObjectPath, long timestamp) to enable the FOR SYSTEM_TIME AS OF syntax:
public class MyVersionedCatalog implements Catalog {
@Override
public CatalogBaseTable getTable(ObjectPath tablePath, long timestamp)
throws TableNotExistException {
Schema schema = buildSchemaAt(timestamp);
Map<String, String> options = buildOptionsAt(timestamp);
return CatalogTable.newBuilder()
.schema(schema)
.options(options)
.snapshot(timestamp)
.build();
}
}
Users can then query historical data:
SELECT * FROM my_table FOR SYSTEM_TIME AS OF TIMESTAMP '2024-01-01 00:00:00';