/var/www/yatta47.log

/var/www/yatta47.log

やったのログ置場です。スクラップみたいな短編が多いかと。

topicの削除

f:id:yatta47:20200301125212p:plain

sample-topicという名前のtopicを削除する。

root@cli:/# kafka-topics --delete --zookeeper zookeeper:32181 --topic sample-topic
Topic sample-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
root@cli:/#

注意点としては、kafka本体の設定で、delete可能設定を入れる必要がある。

具体的にはserver.propertiesのdelete.topic.enable=trueに設定しておく必要ある。

そして指定するのはbootstrap-server(kafka)ではなくzookeeperを指定する。zookeeperがkafkaの状態を管理しているっていうのを意識しますね。  

削除すると、kafkaのログには以下のようなログが出力される。

broker       | [2020-03-01 04:04:22,168] DEBUG [Controller id=1] Delete topics listener fired for topics sample-topic to be deleted (kafka.controller.KafkaController)
broker       | [2020-03-01 04:04:22,168] INFO [Controller id=1] Starting topic deletion for topics sample-topic (kafka.controller.KafkaController)
broker       | [2020-03-01 04:04:22,170] INFO [Topic Deletion Manager 1] Handling deletion for topics sample-topic (kafka.controller.TopicDeletionManager)
broker       | [2020-03-01 04:04:22,177] INFO [Topic Deletion Manager 1] Deletion of topic sample-topic (re)started (kafka.controller.TopicDeletionManager)
broker       | [2020-03-01 04:04:22,182] INFO [Topic Deletion Manager 1] Topic deletion callback for sample-topic (kafka.controller.TopicDeletionManager)
broker       | [2020-03-01 04:04:22,190] TRACE [Controller id=1 epoch=2] Changed partition sample-topic-2 state from OnlinePartition to OfflinePartition (state.change.logger)
broker       | [2020-03-01 04:04:22,191] TRACE [Controller id=1 epoch=2] Changed partition sample-topic-1 state from OnlinePartition to OfflinePartition (state.change.logger)
broker       | [2020-03-01 04:04:22,192] TRACE [Controller id=1 epoch=2] Changed partition sample-topic-0 state from OnlinePartition to OfflinePartition (state.change.logger)
broker       | [2020-03-01 04:04:22,195] TRACE [Controller id=1 epoch=2] Changed partition sample-topic-2 state from OfflinePartition to NonExistentPartition (state.change.logger)
broker       | [2020-03-01 04:04:22,196] TRACE [Controller id=1 epoch=2] Changed partition sample-topic-1 state from OfflinePartition to NonExistentPartition (state.change.logger)
broker       | [2020-03-01 04:04:22,196] TRACE [Controller id=1 epoch=2] Changed partition sample-topic-0 state from OfflinePartition to NonExistentPartition (state.change.logger)
broker       | [2020-03-01 04:04:22,198] TRACE [Controller id=1 epoch=2] Sending UpdateMetadata request UpdateMetadataPartitionState(topicName='sample-topic', partitionIndex=0, controllerEpoch=1, leader=-2, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], offlineReplicas=[]) to brokers Set(1) for partition sample-topic-0 (state.change.logger)
broker       | [2020-03-01 04:04:22,199] TRACE [Controller id=1 epoch=2] Sending UpdateMetadata request UpdateMetadataPartitionState(topicName='sample-topic', partitionIndex=2, controllerEpoch=1, leader=-2, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], offlineReplicas=[]) to brokers Set(1) for partition sample-topic-2 (state.change.logger)
broker       | [2020-03-01 04:04:22,200] TRACE [Controller id=1 epoch=2] Sending UpdateMetadata request UpdateMetadataPartitionState(topicName='sample-topic', partitionIndex=1, controllerEpoch=1, leader=-2, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], offlineReplicas=[]) to brokers Set(1) for partition sample-topic-1 (state.change.logger)
broker       | [2020-03-01 04:04:22,204] INFO [Topic Deletion Manager 1] Partition deletion callback for sample-topic-2,sample-topic-1,sample-topic-0 (kafka.controller.TopicDeletionManager)
broker       | [2020-03-01 04:04:22,206] TRACE [Broker id=1] Deleted partition sample-topic-0 from metadata cache in response to UpdateMetadata request sent by controller 1 epoch 2 with correlation id 3 (state.change.logger)
broker       | [2020-03-01 04:04:22,206] TRACE [Broker id=1] Deleted partition sample-topic-2 from metadata cache in response to UpdateMetadata request sent by controller 1 epoch 2 with correlation id 3 (state.change.logger)
broker       | [2020-03-01 04:04:22,206] TRACE [Broker id=1] Deleted partition sample-topic-1 from metadata cache in response to UpdateMetadata request sent by controller 1 epoch 2 with correlation id 3 (state.change.logger)
broker       | [2020-03-01 04:04:22,207] INFO [GroupCoordinator 1]: Removed 0 offsets associated with deleted partitions: sample-topic-0, sample-topic-2, sample-topic-1. (kafka.coordinator.group.GroupCoordinator)
broker       | [2020-03-01 04:04:22,209] TRACE [Controller id=1 epoch=2] Received response {error_code=0,_tagged_fields={}} for request UPDATE_METADATA with correlation id 3 sent to broker broker:9092 (id: 1 rack: null) (state.change.logger)
broker       | [2020-03-01 04:04:22,243] TRACE [Controller id=1 epoch=2] Changed state of replica 1 for partition sample-topic-0 from OnlineReplica to OfflineReplica (state.change.logger)
broker       | [2020-03-01 04:04:22,243] TRACE [Controller id=1 epoch=2] Changed state of replica 1 for partition sample-topic-2 from OnlineReplica to OfflineReplica (state.change.logger)
broker       | [2020-03-01 04:04:22,244] TRACE [Controller id=1 epoch=2] Changed state of replica 1 for partition sample-topic-1 from OnlineReplica to OfflineReplica (state.change.logger)
broker       | [2020-03-01 04:04:22,247] DEBUG The stop replica request (delete = false) sent to broker 1 is StopReplicaRequestInfo([Topic=sample-topic,Partition=2,Replica=1],false),StopReplicaRequestInfo([Topic=sample-topic,Partition=1,Replica=1],false),StopReplicaRequestInfo([Topic=sample-topic,Partition=0,Replica=1],false) (kafka.controller.ControllerBrokerRequestBatch)
broker       | [2020-03-01 04:04:22,252] DEBUG [Topic Deletion Manager 1] Deletion started for replicas [Topic=sample-topic,Partition=2,Replica=1],[Topic=sample-topic,Partition=1,Replica=1],[Topic=sample-topic,Partition=0,Replica=1] (kafka.controller.TopicDeletionManager)
broker       | [2020-03-01 04:04:22,255] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(sample-topic-2, sample-topic-1, sample-topic-0) (kafka.server.ReplicaFetcherManager)
broker       | [2020-03-01 04:04:22,255] INFO [ReplicaAlterLogDirsManager on broker 1] Removed fetcher for partitions Set(sample-topic-2, sample-topic-1, sample-topic-0) (kafka.server.ReplicaAlterLogDirsManager)
broker       | [2020-03-01 04:04:22,256] TRACE [Broker id=1] Handling stop replica (delete=false) for partition sample-topic-2 (state.change.logger)
broker       | [2020-03-01 04:04:22,259] TRACE [Broker id=1] Finished handling stop replica (delete=false) for partition sample-topic-2 (state.change.logger)
broker       | [2020-03-01 04:04:22,259] TRACE [Broker id=1] Handling stop replica (delete=false) for partition sample-topic-1 (state.change.logger)
broker       | [2020-03-01 04:04:22,260] TRACE [Broker id=1] Finished handling stop replica (delete=false) for partition sample-topic-1 (state.change.logger)
broker       | [2020-03-01 04:04:22,260] TRACE [Broker id=1] Handling stop replica (delete=false) for partition sample-topic-0 (state.change.logger)
broker       | [2020-03-01 04:04:22,260] TRACE [Broker id=1] Finished handling stop replica (delete=false) for partition sample-topic-0 (state.change.logger)
broker       | [2020-03-01 04:04:22,264] TRACE [Controller id=1 epoch=2] Received response {error_code=0,partition_errors=[{topic_name=sample-topic,partition_index=0,error_code=0,_tagged_fields={}},{topic_name=sample-topic,partition_index=2,error_code=0,_tagged_fields={}},{topic_name=sample-topic,partition_index=1,error_code=0,_tagged_fields={}}],_tagged_fields={}} for request STOP_REPLICA with correlation id 4 sent to broker broker:9092 (id: 1 rack: null) (state.change.logger)
broker       | [2020-03-01 04:04:22,265] TRACE [Controller id=1 epoch=2] Changed state of replica 1 for partition sample-topic-2 from OfflineReplica to ReplicaDeletionStarted (state.change.logger)
broker       | [2020-03-01 04:04:22,265] TRACE [Controller id=1 epoch=2] Changed state of replica 1 for partition sample-topic-1 from OfflineReplica to ReplicaDeletionStarted (state.change.logger)
broker       | [2020-03-01 04:04:22,267] TRACE [Controller id=1 epoch=2] Changed state of replica 1 for partition sample-topic-0 from OfflineReplica to ReplicaDeletionStarted (state.change.logger)
broker       | [2020-03-01 04:04:22,269] DEBUG The stop replica request (delete = true) sent to broker 1 is StopReplicaRequestInfo([Topic=sample-topic,Partition=2,Replica=1],true),StopReplicaRequestInfo([Topic=sample-topic,Partition=1,Replica=1],true),StopReplicaRequestInfo([Topic=sample-topic,Partition=0,Replica=1],true) (kafka.controller.ControllerBrokerRequestBatch)
broker       | [2020-03-01 04:04:22,272] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(sample-topic-2, sample-topic-1, sample-topic-0) (kafka.server.ReplicaFetcherManager)
broker       | [2020-03-01 04:04:22,272] INFO [ReplicaAlterLogDirsManager on broker 1] Removed fetcher for partitions Set(sample-topic-2, sample-topic-1, sample-topic-0) (kafka.server.ReplicaAlterLogDirsManager)
broker       | [2020-03-01 04:04:22,273] TRACE [Broker id=1] Handling stop replica (delete=true) for partition sample-topic-2 (state.change.logger)
broker       | [2020-03-01 04:04:22,278] INFO The cleaning for partition sample-topic-2 is aborted and paused (kafka.log.LogCleaner)
broker       | [2020-03-01 04:04:22,279] INFO The cleaning for partition sample-topic-2 is aborted (kafka.log.LogCleaner)
broker       | [2020-03-01 04:04:22,297] INFO Log for partition sample-topic-2 is renamed to /var/lib/kafka/data/sample-topic-2.eec7ffd596fc402eb5ee7eb2a12989d5-delete and is scheduled for deletion (kafka.log.LogManager)
broker       | [2020-03-01 04:04:22,297] TRACE [Broker id=1] Finished handling stop replica (delete=true) for partition sample-topic-2 (state.change.logger)
broker       | [2020-03-01 04:04:22,298] TRACE [Broker id=1] Handling stop replica (delete=true) for partition sample-topic-1 (state.change.logger)
broker       | [2020-03-01 04:04:22,301] INFO The cleaning for partition sample-topic-1 is aborted and paused (kafka.log.LogCleaner)
broker       | [2020-03-01 04:04:22,302] INFO The cleaning for partition sample-topic-1 is aborted (kafka.log.LogCleaner)
broker       | [2020-03-01 04:04:22,416] INFO Log for partition sample-topic-1 is renamed to /var/lib/kafka/data/sample-topic-1.5eca45d039a44e2b968907bb7f3adc22-delete and is scheduled for deletion (kafka.log.LogManager)
broker       | [2020-03-01 04:04:22,416] TRACE [Broker id=1] Finished handling stop replica (delete=true) for partition sample-topic-1 (state.change.logger)
broker       | [2020-03-01 04:04:22,416] TRACE [Broker id=1] Handling stop replica (delete=true) for partition sample-topic-0 (state.change.logger)
broker       | [2020-03-01 04:04:22,422] INFO The cleaning for partition sample-topic-0 is aborted and paused (kafka.log.LogCleaner)
broker       | [2020-03-01 04:04:22,422] INFO The cleaning for partition sample-topic-0 is aborted (kafka.log.LogCleaner)
broker       | [2020-03-01 04:04:22,431] INFO Log for partition sample-topic-0 is renamed to /var/lib/kafka/data/sample-topic-0.f906baa5cd6e4b5dafadad7280587e0c-delete and is scheduled for deletion (kafka.log.LogManager)
broker       | [2020-03-01 04:04:22,431] TRACE [Broker id=1] Finished handling stop replica (delete=true) for partition sample-topic-0 (state.change.logger)
broker       | [2020-03-01 04:04:22,433] TRACE [Controller id=1 epoch=2] Received response {error_code=0,partition_errors=[{topic_name=sample-topic,partition_index=0,error_code=0,_tagged_fields={}},{topic_name=sample-topic,partition_index=2,error_code=0,_tagged_fields={}},{topic_name=sample-topic,partition_index=1,error_code=0,_tagged_fields={}}],_tagged_fields={}} for request STOP_REPLICA with correlation id 5 sent to broker broker:9092 (id: 1 rack: null) (state.change.logger)
broker       | [2020-03-01 04:04:22,435] DEBUG [Controller id=1] Delete topic callback invoked on StopReplica response received from broker 1: request error = NONE, partition errors = Map(sample-topic-0 -> NONE, sample-topic-2 -> NONE, sample-topic-1 -> NONE) (kafka.controller.KafkaController)
broker       | [2020-03-01 04:04:22,438] DEBUG [Topic Deletion Manager 1] Deletion successfully completed for replicas [Topic=sample-topic,Partition=0,Replica=1],[Topic=sample-topic,Partition=2,Replica=1],[Topic=sample-topic,Partition=1,Replica=1] (kafka.controller.TopicDeletionManager)
broker       | [2020-03-01 04:04:22,440] TRACE [Controller id=1 epoch=2] Changed state of replica 1 for partition sample-topic-0 from ReplicaDeletionStarted to ReplicaDeletionSuccessful (state.change.logger)
broker       | [2020-03-01 04:04:22,440] TRACE [Controller id=1 epoch=2] Changed state of replica 1 for partition sample-topic-2 from ReplicaDeletionStarted to ReplicaDeletionSuccessful (state.change.logger)
broker       | [2020-03-01 04:04:22,440] TRACE [Controller id=1 epoch=2] Changed state of replica 1 for partition sample-topic-1 from ReplicaDeletionStarted to ReplicaDeletionSuccessful (state.change.logger)
broker       | [2020-03-01 04:04:22,442] INFO [Topic Deletion Manager 1] Handling deletion for topics sample-topic (kafka.controller.TopicDeletionManager)
broker       | [2020-03-01 04:04:22,446] TRACE [Controller id=1 epoch=2] Changed state of replica 1 for partition sample-topic-2 from ReplicaDeletionSuccessful to NonExistentReplica (state.change.logger)
broker       | [2020-03-01 04:04:22,446] TRACE [Controller id=1 epoch=2] Changed state of replica 1 for partition sample-topic-1 from ReplicaDeletionSuccessful to NonExistentReplica (state.change.logger)
broker       | [2020-03-01 04:04:22,446] TRACE [Controller id=1 epoch=2] Changed state of replica 1 for partition sample-topic-0 from ReplicaDeletionSuccessful to NonExistentReplica (state.change.logger)
broker       | [2020-03-01 04:04:22,531] INFO [Topic Deletion Manager 1] Deletion of topic sample-topic successfully completed (kafka.controller.TopicDeletionManager)
broker       | [2020-03-01 04:04:22,534] INFO [Controller id=1] New topics: [Set()], deleted topics: [Set()], new partition replica assignment [Map()] (kafka.controller.KafkaController)
broker       | [2020-03-01 04:04:22,536] DEBUG [Controller id=1] Delete topics listener fired for topics  to be deleted (kafka.controller.KafkaController)




broker       | [2020-03-01 04:05:22,302] INFO [Log partition=sample-topic-2, dir=/var/lib/kafka/data] Deleting segments List(LogSegment(baseOffset=0, size=0, lastModifiedTime=1583032419000, largestTime=1583032419000)) (kafka.log.Log)
broker       | [2020-03-01 04:05:22,307] INFO Deleted log /var/lib/kafka/data/sample-topic-2.eec7ffd596fc402eb5ee7eb2a12989d5-delete/00000000000000000000.log.deleted. (kafka.log.LogSegment)
broker       | [2020-03-01 04:05:22,315] INFO Deleted offset index /var/lib/kafka/data/sample-topic-2.eec7ffd596fc402eb5ee7eb2a12989d5-delete/00000000000000000000.index.deleted. (kafka.log.LogSegment)
broker       | [2020-03-01 04:05:22,315] INFO Deleted time index /var/lib/kafka/data/sample-topic-2.eec7ffd596fc402eb5ee7eb2a12989d5-delete/00000000000000000000.timeindex.deleted. (kafka.log.LogSegment)
broker       | [2020-03-01 04:05:22,319] INFO Deleted log for partition sample-topic-2 in /var/lib/kafka/data/sample-topic-2.eec7ffd596fc402eb5ee7eb2a12989d5-delete. (kafka.log.LogManager)
broker       | [2020-03-01 04:05:22,419] INFO [Log partition=sample-topic-1, dir=/var/lib/kafka/data] Deleting segments List(LogSegment(baseOffset=0, size=0, lastModifiedTime=1583032419000, largestTime=1583032419000)) (kafka.log.Log)
broker       | [2020-03-01 04:05:22,419] INFO Deleted log /var/lib/kafka/data/sample-topic-1.5eca45d039a44e2b968907bb7f3adc22-delete/00000000000000000000.log.deleted. (kafka.log.LogSegment)
broker       | [2020-03-01 04:05:22,421] INFO Deleted offset index /var/lib/kafka/data/sample-topic-1.5eca45d039a44e2b968907bb7f3adc22-delete/00000000000000000000.index.deleted. (kafka.log.LogSegment)
broker       | [2020-03-01 04:05:22,421] INFO Deleted time index /var/lib/kafka/data/sample-topic-1.5eca45d039a44e2b968907bb7f3adc22-delete/00000000000000000000.timeindex.deleted. (kafka.log.LogSegment)
broker       | [2020-03-01 04:05:22,421] INFO Deleted log for partition sample-topic-1 in /var/lib/kafka/data/sample-topic-1.5eca45d039a44e2b968907bb7f3adc22-delete. (kafka.log.LogManager)
broker       | [2020-03-01 04:05:22,432] INFO [Log partition=sample-topic-0, dir=/var/lib/kafka/data] Deleting segments List(LogSegment(baseOffset=0, size=0, lastModifiedTime=1583032419000, largestTime=1583032419000)) (kafka.log.Log)
broker       | [2020-03-01 04:05:22,433] INFO Deleted log /var/lib/kafka/data/sample-topic-0.f906baa5cd6e4b5dafadad7280587e0c-delete/00000000000000000000.log.deleted. (kafka.log.LogSegment)
broker       | [2020-03-01 04:05:22,434] INFO Deleted offset index /var/lib/kafka/data/sample-topic-0.f906baa5cd6e4b5dafadad7280587e0c-delete/00000000000000000000.index.deleted. (kafka.log.LogSegment)
broker       | [2020-03-01 04:05:22,434] INFO Deleted time index /var/lib/kafka/data/sample-topic-0.f906baa5cd6e4b5dafadad7280587e0c-delete/00000000000000000000.timeindex.deleted. (kafka.log.LogSegment)
broker       | [2020-03-01 04:05:22,435] INFO Deleted log for partition sample-topic-0 in /var/lib/kafka/data/sample-topic-0.f906baa5cd6e4b5dafadad7280587e0c-delete. (kafka.log.LogManager)

コマンド投入した直後に削除がスタートして、その後削除されるのが面白い。そういう動きになるんだな~。

参考

Apache kafkaでトピックを削除する - Qiita
https://qiita.com/btamari-gy/items/b22862524c4554b49e8a