flink的数据交互
背景
了解flink之间是如何数据交互的
同operator chain
getNextBuffer:251, LocalInputChannel (org.apache.flink.runtime.io.network.partition.consumer)
readBufferFromInputChannel:904, SingleInputGate (org.apache.flink.runtime.io.network.partition.consumer)
readRecoveredOrNormalBuffer:899, SingleInputGate (org.apache.flink.runtime.io.network.partition.consumer)
waitAndGetNextData:839, SingleInputGate (org.apache.flink.runtime.io.network.partition.consumer)
getNextBufferOrEvent:810, SingleInputGate (org.apache.flink.runtime.io.network.partition.consumer)
pollNext:798, SingleInputGate (org.apache.flink.runtime.io.network.partition.consumer)
pollNext:130, InputGateWithMetrics (org.apache.flink.runtime.taskmanager)
waitAndGetNextData:237, UnionInputGate (org.apache.flink.runtime.io.network.partition.consumer)
getNextBufferOrEvent:211, UnionInputGate (org.apache.flink.runtime.io.network.partition.consumer)
pollNext:201, UnionInputGate (org.apache.flink.runtime.io.network.partition.consumer)
pollNext:150, CheckpointedInputGate (org.apache.flink.streaming.runtime.io.checkpointing)
emitNext:122, AbstractStreamTaskNetworkInput (org.apache.flink.streaming.runtime.io)
processInput:65, StreamOneInputProcessor (org.apache.flink.streaming.runtime.io)
processInput:579, StreamTask (org.apache.flink.streaming.runtime.tasks)
runDefaultAction:-1, StreamTask$$Lambda/0x00007fd5a7675dc0 (org.apache.flink.streaming.runtime.tasks)
runMailboxLoop:231, MailboxProcessor (org.apache.flink.streaming.runtime.tasks.mailbox)
runMailboxLoop:909, StreamTask (org.apache.flink.streaming.runtime.tasks)
invoke:858, StreamTask (org.apache.flink.streaming.runtime.tasks)
run:-1, Task$$Lambda/0x00007fd5a779ad68 (org.apache.flink.runtime.taskmanager)
runWithSystemExitMonitoring:958, Task (org.apache.flink.runtime.taskmanager)
restoreAndInvoke:937, Task (org.apache.flink.runtime.taskmanager)
doRun:751, Task (org.apache.flink.runtime.taskmanager)
run:566, Task (org.apache.flink.runtime.taskmanager)
runWith:1583, Thread (java.lang)
run:1570, Thread (java.lang)
写入逻辑:
flush:674, PipelinedSubpartition (org.apache.flink.runtime.io.network.partition)
flushAllSubpartitions:148, BufferWritingResultPartition (org.apache.flink.runtime.io.network.partition)
flushAll:194, PipelinedResultPartition (org.apache.flink.runtime.io.network.partition)
flushAll:161, RecordWriter (org.apache.flink.runtime.io.network.api.writer)
run:291, RecordWriter$OutputFlusher (org.apache.flink.runtime.io.network.api.writer)
从kafka读取数据:
emit:108, RecordWriter (org.apache.flink.runtime.io.network.api.writer)
emit:55, ChannelSelectorRecordWriter (org.apache.flink.runtime.io.network.api.writer)
pushToRecordWriter:140, RecordWriterOutput (org.apache.flink.streaming.runtime.io)
collectAndCheckIfChained:120, RecordWriterOutput (org.apache.flink.streaming.runtime.io)
collect:101, RecordWriterOutput (org.apache.flink.streaming.runtime.io)
collect:53, RecordWriterOutput (org.apache.flink.streaming.runtime.io)
collect:60, CountingOutput (org.apache.flink.streaming.api.operators)
collect:32, CountingOutput (org.apache.flink.streaming.api.operators)
emitRecord:310, SourceOperatorStreamTask$AsyncDataOutputToOutput (org.apache.flink.streaming.runtime.tasks)
collect:110, SourceOutputWithWatermarks (org.apache.flink.streaming.api.operators.source)
collect:67, KafkaRecordEmitter$SourceOutputWrapper (org.apache.flink.connector.kafka.source.reader)
deserialize:84, DeserializationSchema (org.apache.flink.api.common.serialization)
deserialize:51, KafkaValueOnlyDeserializationSchemaWrapper (org.apache.flink.connector.kafka.source.reader.deserializer)
emitRecord:53, KafkaRecordEmitter (org.apache.flink.connector.kafka.source.reader)
emitRecord:33, KafkaRecordEmitter (org.apache.flink.connector.kafka.source.reader)
pollNext:203, SourceReaderBase (org.apache.flink.connector.base.source.reader)
emitNext:421, SourceOperator (org.apache.flink.streaming.api.operators)
emitNext:68, StreamTaskSourceInput (org.apache.flink.streaming.runtime.io)
processInput:65, StreamOneInputProcessor (org.apache.flink.streaming.runtime.io)
processInput:579, StreamTask (org.apache.flink.streaming.runtime.tasks)
runDefaultAction:-1, StreamTask$$Lambda/0x00007fe8936706a0 (org.apache.flink.streaming.runtime.tasks)
runMailboxLoop:231, MailboxProcessor (org.apache.flink.streaming.runtime.tasks.mailbox)
runMailboxLoop:909, StreamTask (org.apache.flink.streaming.runtime.tasks)
invoke:858, StreamTask (org.apache.flink.streaming.runtime.tasks)
run:-1, Task$$Lambda/0x00007fe893719f40 (org.apache.flink.runtime.taskmanager)
runWithSystemExitMonitoring:958, Task (org.apache.flink.runtime.taskmanager)
restoreAndInvoke:937, Task (org.apache.flink.runtime.taskmanager)
doRun:751, Task (org.apache.flink.runtime.taskmanager)
run:566, Task (org.apache.flink.runtime.taskmanager)
runWith:1583, Thread (java.lang)
run:1570, Thread (java.lang)
kafka读过程:
// /org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@Override
public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
// make sure we have a fetch we are working on, or move to the next
RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
if (recordsWithSplitId == null) {
recordsWithSplitId = getNextFetch(output);
if (recordsWithSplitId == null) {
return trace(finishedOrAvailableLater());
}
}
// we need to loop here, because we may have to go across splits
while (true) {
// Process one record.
final E record = recordsWithSplitId.nextRecordFromSplit();
if (record != null) {
// emit the record.
numRecordsInCounter.inc(1);
recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
LOG.trace("Emitted record: {}", record);
// We always emit MORE_AVAILABLE here, even though we do not strictly know whether
// more is available. If nothing more is available, the next invocation will find
// this out and return the correct status.
// That means we emit the occasional 'false positive' for availability, but this
// saves us doing checks for every record. Ultimately, this is cheaper.
return trace(InputStatus.MORE_AVAILABLE);
} else if (!moveToNextSplit(recordsWithSplitId, output)) {
// The fetch is done and we just discovered that and have not emitted anything, yet.
// We need to move to the next fetch. As a shortcut, we call pollNext() here again,
// rather than emitting nothing and waiting for the caller to call us again.
return pollNext(output);
}
}
}
kafka消费过程:
<init>:100, ConsumerRecord (org.apache.kafka.clients.consumer)
parseRecord:1430, Fetcher (org.apache.kafka.clients.consumer.internals)
access$3400:133, Fetcher (org.apache.kafka.clients.consumer.internals)
fetchRecords:1661, Fetcher$CompletedFetch (org.apache.kafka.clients.consumer.internals)
access$1900:1497, Fetcher$CompletedFetch (org.apache.kafka.clients.consumer.internals)
fetchRecords:717, Fetcher (org.apache.kafka.clients.consumer.internals)
collectFetch:683, Fetcher (org.apache.kafka.clients.consumer.internals)
pollForFetches:1314, KafkaConsumer (org.apache.kafka.clients.consumer)
poll:1243, KafkaConsumer (org.apache.kafka.clients.consumer)
poll:1216, KafkaConsumer (org.apache.kafka.clients.consumer)
fetch:110, KafkaPartitionSplitReader (org.apache.flink.connector.kafka.source.reader)
run:58, FetchTask (org.apache.flink.connector.base.source.reader.fetcher)
runOnce:165, SplitFetcher (org.apache.flink.connector.base.source.reader.fetcher)
run:117, SplitFetcher (org.apache.flink.connector.base.source.reader.fetcher)
call:572, Executors$RunnableAdapter (java.util.concurrent)
run$$$capture:317, FutureTask (java.util.concurrent)
run:-1, FutureTask (java.util.concurrent)
- Async stack trace
<init>:151, FutureTask (java.util.concurrent)
newTaskFor:98, AbstractExecutorService (java.util.concurrent)
submit:122, AbstractExecutorService (java.util.concurrent)
startFetcher:244, SplitFetcherManager (org.apache.flink.connector.base.source.reader.fetcher)
addSplits:151, SingleThreadFetcherManager (org.apache.flink.connector.base.source.reader.fetcher)
addSplits:315, SourceReaderBase (org.apache.flink.connector.base.source.reader)
handleAddSplitsEvent:600, SourceOperator (org.apache.flink.streaming.api.operators)
handleOperatorEvent:569, SourceOperator (org.apache.flink.streaming.api.operators)
dispatchEventToHandlers:72, OperatorEventDispatcherImpl (org.apache.flink.streaming.runtime.tasks)
dispatchOperatorEvent:80, RegularOperatorChain (org.apache.flink.streaming.runtime.tasks)
lambda$dispatchOperatorEvent$22:1540, StreamTask (org.apache.flink.streaming.runtime.tasks)
runThrowing:50, StreamTaskActionExecutor$1 (org.apache.flink.streaming.runtime.tasks)
run:90, Mail (org.apache.flink.streaming.runtime.tasks.mailbox)
runMail:398, MailboxProcessor (org.apache.flink.streaming.runtime.tasks.mailbox)
processMailsWhenDefaultActionUnavailable:367, MailboxProcessor (org.apache.flink.streaming.runtime.tasks.mailbox)
processMail:352, MailboxProcessor (org.apache.flink.streaming.runtime.tasks.mailbox)
runMailboxLoop:229, MailboxProcessor (org.apache.flink.streaming.runtime.tasks.mailbox)
runMailboxLoop:909, StreamTask (org.apache.flink.streaming.runtime.tasks)
invoke:858, StreamTask (org.apache.flink.streaming.runtime.tasks)
runWithSystemExitMonitoring:958, Task (org.apache.flink.runtime.taskmanager)
restoreAndInvoke:937, Task (org.apache.flink.runtime.taskmanager)
doRun:751, Task (org.apache.flink.runtime.taskmanager)
run:566, Task (org.apache.flink.runtime.taskmanager)
run:1570, Thread (java.lang)