Interface RemoteLogMetadataManager

All Superinterfaces:
AutoCloseable, Closeable, Configurable

public interface RemoteLogMetadataManager extends Configurable, Closeable
This interface provides storing and fetching remote log segment metadata with strongly consistent semantics.

This class can be plugged in to Kafka cluster by adding the implementation class as remote.log.metadata.manager.class.name property value. There is an inbuilt implementation backed by topic storage in the local cluster. This is used as the default implementation if remote.log.metadata.manager.class.name is not configured.

remote.log.metadata.manager.class.path property is about the class path of the RemoteLogMetadataManager implementation. If specified, the RemoteLogMetadataManager implementation and its dependent libraries will be loaded by a dedicated classloader which searches this class path before the Kafka broker class path. The syntax of this parameter is same with the standard Java class path string.

remote.log.metadata.manager.listener.name property is about listener name of the local broker to which it should get connected if needed by RemoteLogMetadataManager implementation.

"cluster.id", "broker.id" and all other properties prefixed with the config: "remote.log.metadata.manager.impl.prefix" (default value is "rlmm.config.") are passed when Configurable.configure(Map) is invoked on this instance.

  • Method Details

    • addRemoteLogSegmentMetadata

      CompletableFuture<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException
      This method is used to add RemoteLogSegmentMetadata asynchronously with the containing RemoteLogSegmentId into RemoteLogMetadataManager.

      RemoteLogSegmentMetadata is identified by RemoteLogSegmentId and it should have the initial state which is RemoteLogSegmentState.COPY_SEGMENT_STARTED.

      updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate) should be used to update an existing RemoteLogSegmentMetadata.

      Parameters:
      remoteLogSegmentMetadata - metadata about the remote log segment.
      Returns:
      a CompletableFuture which will complete once this operation is finished.
      Throws:
      RemoteStorageException - if there are any storage related errors occurred.
      IllegalArgumentException - if the given metadata instance does not have the state as RemoteLogSegmentState.COPY_SEGMENT_STARTED
    • updateRemoteLogSegmentMetadata

      CompletableFuture<Void> updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) throws RemoteStorageException
      This method is used to update the RemoteLogSegmentMetadata asynchronously. Currently, it allows to update with the new state based on the life cycle of the segment. It can go through the below state transitions.

       +---------------------+            +----------------------+
       |COPY_SEGMENT_STARTED |----------->|COPY_SEGMENT_FINISHED |
       +-------------------+-+            +--+-------------------+
                           |                 |
                           |                 |
                           v                 v
                        +--+-----------------+-+
                        |DELETE_SEGMENT_STARTED|
                        +-----------+----------+
                                    |
                                    |
                                    v
                        +-----------+-----------+
                        |DELETE_SEGMENT_FINISHED|
                        +-----------------------+
       

      RemoteLogSegmentState.COPY_SEGMENT_STARTED - This state indicates that the segment copying to remote storage is started but not yet finished. RemoteLogSegmentState.COPY_SEGMENT_FINISHED - This state indicates that the segment copying to remote storage is finished.
      The leader broker copies the log segments to the remote storage and puts the remote log segment metadata with the state as “COPY_SEGMENT_STARTED” and updates the state as “COPY_SEGMENT_FINISHED” once the copy is successful.

      RemoteLogSegmentState.DELETE_SEGMENT_STARTED - This state indicates that the segment deletion is started but not yet finished. RemoteLogSegmentState.DELETE_SEGMENT_FINISHED - This state indicates that the segment is deleted successfully.
      Leader partitions publish both the above delete segment events when remote log retention is reached for the respective segments. Remote Partition Removers also publish these events when a segment is deleted as part of the remote partition deletion.
      Parameters:
      remoteLogSegmentMetadataUpdate - update of the remote log segment metadata.
      Returns:
      a CompletableFuture which will complete once this operation is finished.
      Throws:
      RemoteStorageException - if there are any storage related errors occurred.
      RemoteResourceNotFoundException - when there are no resources associated with the given remoteLogSegmentMetadataUpdate.
      IllegalArgumentException - if the given metadata instance has the state as RemoteLogSegmentState.COPY_SEGMENT_STARTED
    • remoteLogSegmentMetadata

      Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(TopicIdPartition topicIdPartition, int epochForOffset, long offset) throws RemoteStorageException
      Returns RemoteLogSegmentMetadata if it exists for the given topic partition containing the offset with the given leader-epoch for the offset, else returns Optional.empty().
      Parameters:
      topicIdPartition - topic partition
      epochForOffset - leader epoch for the given offset
      offset - offset
      Returns:
      the requested remote log segment metadata if it exists.
      Throws:
      RemoteStorageException - if there are any storage related errors occurred.
    • highestOffsetForEpoch

      Optional<Long> highestOffsetForEpoch(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException
      Returns the highest log offset of topic partition for the given leader epoch in remote storage. This is used by remote log management subsystem to know up to which offset the segments have been copied to remote storage for a given leader epoch.
      Parameters:
      topicIdPartition - topic partition
      leaderEpoch - leader epoch
      Returns:
      the requested highest log offset if exists.
      Throws:
      RemoteStorageException - if there are any storage related errors occurred.
    • putRemotePartitionDeleteMetadata

      CompletableFuture<Void> putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) throws RemoteStorageException
      This method is used to update the metadata about remote partition delete event asynchronously. Currently, it allows updating the state (RemotePartitionDeleteState) of a topic partition in remote metadata storage. Controller invokes this method with RemotePartitionDeleteMetadata having state as RemotePartitionDeleteState.DELETE_PARTITION_MARKED. So, remote partition removers can act on this event to clean the respective remote log segments of the partition.


      In the case of default RLMM implementation, remote partition remover processes RemotePartitionDeleteState.DELETE_PARTITION_MARKED

      Parameters:
      remotePartitionDeleteMetadata - update on delete state of a partition.
      Returns:
      a CompletableFuture which will complete once this operation is finished.
      Throws:
      RemoteStorageException - if there are any storage related errors occurred.
      RemoteResourceNotFoundException - when there are no resources associated with the given remotePartitionDeleteMetadata.
    • listRemoteLogSegments

      Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition) throws RemoteStorageException
      Returns all the remote log segment metadata of the given topicIdPartition.

      Remote Partition Removers uses this method to fetch all the segments for a given topic partition, so that they can delete them.

      Returns:
      Iterator of all the remote log segment metadata for the given topic partition.
      Throws:
      RemoteStorageException
    • listRemoteLogSegments

      Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException
      Returns iterator of remote log segment metadata, sorted by RemoteLogSegmentMetadata.startOffset() in ascending order which contains the given leader epoch. This is used by remote log retention management subsystem to fetch the segment metadata for a given leader epoch.
      Parameters:
      topicIdPartition - topic partition
      leaderEpoch - leader epoch
      Returns:
      Iterator of remote segments, sorted by start offset in ascending order.
      Throws:
      RemoteStorageException
    • onPartitionLeadershipChanges

      void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions, Set<TopicIdPartition> followerPartitions)
      This method is invoked only when there are changes in leadership of the topic partitions that this broker is responsible for.
      Parameters:
      leaderPartitions - partitions that have become leaders on this broker.
      followerPartitions - partitions that have become followers on this broker.
    • onStopPartitions

      void onStopPartitions(Set<TopicIdPartition> partitions)
      This method is invoked only when the topic partitions are stopped on this broker. This can happen when a partition is emigrated to other broker or a partition is deleted.
      Parameters:
      partitions - topic partitions that have been stopped.
    • remoteLogSize

      long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException
      Returns total size of the log for the given leader epoch in remote storage.
      Parameters:
      topicIdPartition - topic partition for which size needs to be calculated.
      leaderEpoch - Size will only include segments belonging to this epoch.
      Returns:
      Total size of the log stored in remote storage in bytes.
      Throws:
      RemoteStorageException
    • nextSegmentWithTxnIndex

      default Optional<RemoteLogSegmentMetadata> nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, int epoch, long offset) throws RemoteStorageException
      Returns the next segment metadata that contains the aborted transaction entries for the given topic partition, epoch and offset.
      • The default implementation returns the segment metadata that matches the given epoch and offset irrespective of the presence of the transaction index.
      • The custom implementation can optimize by returning the next segment metadata that contains the txn index in the given epoch. If there are no segments with txn index in the given epoch, then return empty.
      Parameters:
      topicIdPartition - topic partition to search for.
      epoch - leader epoch for the given offset.
      offset - offset
      Returns:
      The next segment metadata. The transaction index may or may not exist in the returned segment metadata which depends on the RLMM plugin implementation. The caller of this method handles for both the cases.
      Throws:
      RemoteStorageException - if there are any storage related errors occurred.
    • isReady

      default boolean isReady(TopicIdPartition topicIdPartition)
      Denotes whether the partition metadata is ready to serve.
      Parameters:
      topicIdPartition - topic partition
      Returns:
      True if the partition is ready to serve for remote storage operations.