diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index 411e3cd0d4..c90e95b799 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -1,15 +1,20 @@ /* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.tez.dag.app; @@ -25,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; @@ -53,6 +59,9 @@ import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolBlockingPB; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolPBServerImpl; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.serviceplugins.api.TaskCommunicator; @@ -61,6 +70,7 @@ import org.apache.tez.serviceplugins.api.TaskHeartbeatResponse; import com.google.common.collect.Maps; +import com.google.protobuf.BlockingService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -150,11 +160,19 @@ protected void startRpcServer() { JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(conf); jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken); + RPC.setProtocolEngine( + conf, TezTaskUmbilicalProtocolBlockingPB.class, ProtobufRpcEngine2.class); + TezTaskUmbilicalProtocolBlockingPB service = + new TezTaskUmbilicalProtocolPBServerImpl(taskUmbilical); + BlockingService umbilicalService = + TezTaskUmbilicalProtocolProtos.TezTaskUmbilicalProtocol.newReflectiveBlockingService( + service); + server = new RPC.Builder(conf) - .setProtocol(TezTaskUmbilicalProtocol.class) + .setProtocol(TezTaskUmbilicalProtocolBlockingPB.class) .setBindAddress("0.0.0.0") .setPort(0) - .setInstance(taskUmbilical) + .setInstance(umbilicalService) .setNumHandlers( conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT, TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT)) @@ -388,7 +406,6 @@ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOExce return response; } - // TODO Remove this method once we move to the Protobuf RPC engine @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/TezAMPolicyProvider.java b/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/TezAMPolicyProvider.java index b2353de678..9420f8cb37 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/TezAMPolicyProvider.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/TezAMPolicyProvider.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,13 +7,14 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.tez.dag.app.security.authorize; @@ -21,9 +22,9 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.Service; -import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolBlockingPB; /** * {@link PolicyProvider} for YARN Tez client protocols. @@ -36,7 +37,7 @@ public class TezAMPolicyProvider extends PolicyProvider { new Service[] { new Service( TezConstants.TEZ_AM_SECURITY_SERVICE_AUTHORIZATION_TASK_UMBILICAL, - TezTaskUmbilicalProtocol.class), + TezTaskUmbilicalProtocolBlockingPB.class), new Service( TezConstants.TEZ_AM_SECURITY_SERVICE_AUTHORIZATION_CLIENT, DAGClientAMProtocolBlockingPB.class) diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java index ed3153f6f2..9d7b04adc9 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java @@ -1,15 +1,20 @@ /* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.tez.service.impl; @@ -34,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; @@ -57,6 +63,8 @@ import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolBlockingPB; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolPBClientImpl; import org.apache.tez.runtime.task.TaskReporter; import org.apache.tez.runtime.task.TaskRunner2Result; import org.apache.tez.runtime.task.TezChild; @@ -439,13 +447,20 @@ public ContainerExecutionResult call() throws Exception { NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort()); SecurityUtil.setTokenService(jobToken, address); taskOwner.addToken(jobToken); - umbilical = taskOwner.doAs(new PrivilegedExceptionAction() { - @Override - public TezTaskUmbilicalProtocol run() throws Exception { - return RPC.getProxy(TezTaskUmbilicalProtocol.class, - TezTaskUmbilicalProtocol.versionID, address, conf); - } - }); + umbilical = + taskOwner.doAs( + (PrivilegedExceptionAction) + () -> { + RPC.setProtocolEngine( + conf, TezTaskUmbilicalProtocolBlockingPB.class, ProtobufRpcEngine2.class); + TezTaskUmbilicalProtocolBlockingPB proxy = + RPC.getProxy( + TezTaskUmbilicalProtocolBlockingPB.class, + RPC.getProtocolVersion(TezTaskUmbilicalProtocolBlockingPB.class), + address, + conf); + return new TezTaskUmbilicalProtocolPBClientImpl(proxy); + }); // TODO Stop reading this on each request. taskReporter = new TaskReporter( umbilical, diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/TezPBConverters.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/TezPBConverters.java new file mode 100644 index 0000000000..09905116bd --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/TezPBConverters.java @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tez.common; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.counters.CounterGroup; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.DagTypeConverters; +import org.apache.tez.dag.api.EntityDescriptor; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.OutputDescriptor; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; +import org.apache.tez.runtime.api.impl.EventMetaData; +import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; +import org.apache.tez.runtime.api.impl.EventType; +import org.apache.tez.runtime.api.impl.GroupInputSpec; +import org.apache.tez.runtime.api.impl.IOStatistics; +import org.apache.tez.runtime.api.impl.InputSpec; +import org.apache.tez.runtime.api.impl.OutputSpec; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.api.impl.TaskStatistics; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.ContainerTaskProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.EventMetaDataProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.EventProducerConsumerTypeProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.EventTypeProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.GroupInputSpecProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.HeartbeatRequestProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.HeartbeatResponseProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.IOStatisticsProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.InputSpecProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.OutputSpecProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.TaskSpecProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.TaskStatisticsProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.TaskStatusUpdateEventProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.TezCounterGroupProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.TezCounterProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.TezCountersProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.TezDAGIDProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.TezEntityDescriptorProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.TezEventProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.TezLocalResourceProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.TezTaskAttemptIDProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.TezTaskIDProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.TezUserPayloadProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.TezVertexIDProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolUtils; + +import com.google.protobuf.ByteString; + +/** + * Utility class for converting between Tez Java domain objects and their Protobuf counterparts used + * in the TaskUmbilicalProtocol. + */ +@Private +public final class TezPBConverters { + + private TezPBConverters() {} + + // --- ID Converters --- + + public static TezDAGIDProto convertToProto(TezDAGID id) { + return TezDAGIDProto.newBuilder() + .setClusterTimestamp(id.getApplicationId().getClusterTimestamp()) + .setAppId(id.getApplicationId().getId()) + .setId(id.getId()) + .build(); + } + + public static TezDAGID convertFromProto(TezDAGIDProto proto) { + return TezDAGID.getInstance( + ApplicationId.newInstance(proto.getClusterTimestamp(), proto.getAppId()), proto.getId()); + } + + public static TezVertexIDProto convertToProto(TezVertexID id) { + return TezVertexIDProto.newBuilder() + .setDagId(convertToProto(id.getDAGID())) + .setId(id.getId()) + .build(); + } + + public static TezVertexID convertFromProto(TezVertexIDProto proto) { + return TezVertexID.getInstance(convertFromProto(proto.getDagId()), proto.getId()); + } + + public static TezTaskIDProto convertToProto(TezTaskID id) { + return TezTaskIDProto.newBuilder() + .setVertexId(convertToProto(id.getVertexID())) + .setId(id.getId()) + .build(); + } + + public static TezTaskID convertFromProto(TezTaskIDProto proto) { + return TezTaskID.getInstance(convertFromProto(proto.getVertexId()), proto.getId()); + } + + public static TezTaskAttemptIDProto convertToProto(TezTaskAttemptID id) { + return TezTaskAttemptIDProto.newBuilder() + .setTaskId(convertToProto(id.getTaskID())) + .setId(id.getId()) + .build(); + } + + public static TezTaskAttemptID convertFromProto(TezTaskAttemptIDProto proto) { + return TezTaskAttemptID.getInstance(convertFromProto(proto.getTaskId()), proto.getId()); + } + + // --- Entity Converters --- + + public static TezEntityDescriptorProto convertToProto(EntityDescriptor descriptor) { + TezEntityDescriptorProto.Builder builder = TezEntityDescriptorProto.newBuilder(); + builder.setClassName(descriptor.getClassName()); + if (descriptor.getUserPayload() != null) { + TezUserPayloadProto.Builder payloadBuilder = TezUserPayloadProto.newBuilder(); + if (descriptor.getUserPayload().getPayload() != null) { + payloadBuilder.setUserPayload( + ByteString.copyFrom(descriptor.getUserPayload().getPayload())); + } + payloadBuilder.setVersion(descriptor.getUserPayload().getVersion()); + builder.setUserPayload(payloadBuilder.build()); + } + if (descriptor.getHistoryText() != null) { + builder.setHistoryText(ByteString.copyFromUtf8(descriptor.getHistoryText())); + } + return builder.build(); + } + + private static void fillDescriptorFromProto( + EntityDescriptor descriptor, TezEntityDescriptorProto proto) { + if (proto.hasUserPayload()) { + descriptor.setUserPayload( + UserPayload.create( + !proto.getUserPayload().getUserPayload().isEmpty() + ? proto.getUserPayload().getUserPayload().asReadOnlyByteBuffer() + : null, + proto.getUserPayload().getVersion())); + } + if (!proto.getHistoryText().isEmpty()) { + descriptor.setHistoryText(proto.getHistoryText().toStringUtf8()); + } + } + + public static ProcessorDescriptor convertProcessorDescriptorFromProto( + TezEntityDescriptorProto proto) { + ProcessorDescriptor descriptor = ProcessorDescriptor.create(proto.getClassName()); + fillDescriptorFromProto(descriptor, proto); + return descriptor; + } + + public static InputDescriptor convertInputDescriptorFromProto(TezEntityDescriptorProto proto) { + InputDescriptor descriptor = InputDescriptor.create(proto.getClassName()); + fillDescriptorFromProto(descriptor, proto); + return descriptor; + } + + public static OutputDescriptor convertOutputDescriptorFromProto(TezEntityDescriptorProto proto) { + OutputDescriptor descriptor = OutputDescriptor.create(proto.getClassName()); + fillDescriptorFromProto(descriptor, proto); + return descriptor; + } + + // --- Spec Converters --- + + public static InputSpecProto convertToProto(InputSpec spec) { + return InputSpecProto.newBuilder() + .setSourceVertexName(spec.getSourceVertexName()) + .setPhysicalEdgeCount(spec.getPhysicalEdgeCount()) + .setInputDescriptor(convertToProto(spec.getInputDescriptor())) + .build(); + } + + public static InputSpec convertFromProto(InputSpecProto proto) { + return new InputSpec( + proto.getSourceVertexName(), + convertInputDescriptorFromProto(proto.getInputDescriptor()), + proto.getPhysicalEdgeCount()); + } + + public static OutputSpecProto convertToProto(OutputSpec spec) { + return OutputSpecProto.newBuilder() + .setDestinationVertexName(spec.getDestinationVertexName()) + .setPhysicalEdgeCount(spec.getPhysicalEdgeCount()) + .setOutputDescriptor(convertToProto(spec.getOutputDescriptor())) + .build(); + } + + public static OutputSpec convertFromProto(OutputSpecProto proto) { + return new OutputSpec( + proto.getDestinationVertexName(), + convertOutputDescriptorFromProto(proto.getOutputDescriptor()), + proto.getPhysicalEdgeCount()); + } + + public static GroupInputSpecProto convertToProto(GroupInputSpec spec) { + return GroupInputSpecProto.newBuilder() + .setGroupName(spec.getGroupName()) + .addAllGroupVertices(spec.getGroupVertices()) + .setMergedInputDescriptor(convertToProto(spec.getMergedInputDescriptor())) + .build(); + } + + public static GroupInputSpec convertFromProto(GroupInputSpecProto proto) { + return new GroupInputSpec( + proto.getGroupName(), + new ArrayList<>(proto.getGroupVerticesList()), + convertInputDescriptorFromProto(proto.getMergedInputDescriptor())); + } + + public static TaskSpecProto convertToProto(TaskSpec spec) { + TaskSpecProto.Builder builder = + TaskSpecProto.newBuilder() + .setTaskAttemptId(convertToProto(spec.getTaskAttemptID())) + .setDagName(spec.getDAGName()) + .setVertexName(spec.getVertexName()) + .setVertexParallelism(spec.getVertexParallelism()) + .setProcessorDescriptor(convertToProto(spec.getProcessorDescriptor())); + for (InputSpec inputSpec : spec.getInputs()) { + builder.addInputSpecs(convertToProto(inputSpec)); + } + for (OutputSpec outputSpec : spec.getOutputs()) { + builder.addOutputSpecs(convertToProto(outputSpec)); + } + if (spec.getGroupInputs() != null) { + for (GroupInputSpec group : spec.getGroupInputs()) { + builder.addGroupInputSpecs(convertToProto(group)); + } + } + if (spec.getTaskConf() != null) { + try { + builder.setTaskConfBytes( + TezTaskUmbilicalProtocolUtils.serializeToByteString(spec.getTaskConf())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return builder.build(); + } + + public static TaskSpec convertFromProto(TaskSpecProto proto) { + List inputs = new ArrayList<>(proto.getInputSpecsCount()); + for (InputSpecProto isp : proto.getInputSpecsList()) { + inputs.add(convertFromProto(isp)); + } + List outputs = new ArrayList<>(proto.getOutputSpecsCount()); + for (OutputSpecProto osp : proto.getOutputSpecsList()) { + outputs.add(convertFromProto(osp)); + } + List groups = null; + if (proto.getGroupInputSpecsCount() > 0) { + groups = new ArrayList<>(proto.getGroupInputSpecsCount()); + for (GroupInputSpecProto gsp : proto.getGroupInputSpecsList()) { + groups.add(convertFromProto(gsp)); + } + } + Configuration conf = null; + if (!proto.getTaskConfBytes().isEmpty()) { + conf = new Configuration(false); + try { + TezTaskUmbilicalProtocolUtils.deserializeFromByteString(proto.getTaskConfBytes(), conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return new TaskSpec( + convertFromProto(proto.getTaskAttemptId()), + proto.getDagName(), + proto.getVertexName(), + proto.getVertexParallelism(), + convertProcessorDescriptorFromProto(proto.getProcessorDescriptor()), + inputs, + outputs, + groups, + conf); + } + + public static TezLocalResourceProto convertToProto(TezLocalResource lr) { + return TezLocalResourceProto.newBuilder() + .setUri(lr.getUri().toString()) + .setSize(lr.getSize()) + .setTimestamp(lr.getTimestamp()) + .build(); + } + + public static TezLocalResource convertFromProto(TezLocalResourceProto proto) { + try { + return new TezLocalResource(new URI(proto.getUri()), proto.getSize(), proto.getTimestamp()); + } catch (java.net.URISyntaxException e) { + throw new RuntimeException(e); + } + } + + public static ContainerTaskProto convertToProto(ContainerTask task) { + ContainerTaskProto.Builder builder = + ContainerTaskProto.newBuilder() + .setShouldDie(task.shouldDie()) + .setCredentialsChanged(task.haveCredentialsChanged()); + if (task.getTaskSpec() != null) { + builder.setTaskSpec(convertToProto(task.getTaskSpec())); + } + if (task.getAdditionalResources() != null) { + for (Map.Entry entry : task.getAdditionalResources().entrySet()) { + builder.putAdditionalResources(entry.getKey(), convertToProto(entry.getValue())); + } + } + if (task.getCredentials() != null) { + builder.setCredentialsBinary( + DagTypeConverters.convertCredentialsToProto(task.getCredentials())); + } + return builder.build(); + } + + public static ContainerTask convertFromProto(ContainerTaskProto proto) { + Map resources = new HashMap<>(); + for (Map.Entry entry : + proto.getAdditionalResourcesMap().entrySet()) { + resources.put(entry.getKey(), convertFromProto(entry.getValue())); + } + return new ContainerTask( + proto.hasTaskSpec() ? convertFromProto(proto.getTaskSpec()) : null, + proto.getShouldDie(), + resources, + DagTypeConverters.convertByteStringToCredentials(proto.getCredentialsBinary()), + proto.getCredentialsChanged()); + } + + // --- Heartbeat & Event Converters --- + + public static EventProducerConsumerTypeProto convertToProto(EventProducerConsumerType type) { + return EventProducerConsumerTypeProto.valueOf("EPC_" + type.name()); + } + + public static EventProducerConsumerType convertFromProto(EventProducerConsumerTypeProto proto) { + return EventProducerConsumerType.valueOf(proto.name().substring(4)); + } + + public static EventMetaDataProto convertToProto(EventMetaData meta) { + EventMetaDataProto.Builder builder = + EventMetaDataProto.newBuilder() + .setProducerConsumerType(convertToProto(meta.getEventGenerator())) + .setTaskVertexName(meta.getTaskVertexName()); + if (meta.getEdgeVertexName() != null) { + builder.setEdgeVertexName(meta.getEdgeVertexName()); + } + if (meta.getTaskAttemptID() != null) { + builder.setTaskAttemptId(convertToProto(meta.getTaskAttemptID())); + } + return builder.build(); + } + + public static EventMetaData convertFromProto(EventMetaDataProto proto) { + return new EventMetaData( + convertFromProto(proto.getProducerConsumerType()), + proto.getTaskVertexName(), + proto.getEdgeVertexName().isEmpty() ? null : proto.getEdgeVertexName(), + proto.hasTaskAttemptId() ? convertFromProto(proto.getTaskAttemptId()) : null); + } + + public static IOStatisticsProto convertToProto(IOStatistics stats) { + return IOStatisticsProto.newBuilder() + .setDataSize(stats.getDataSize()) + .setItemsProcessed(stats.getItemsProcessed()) + .build(); + } + + public static IOStatistics convertFromProto(IOStatisticsProto proto) { + IOStatistics stats = new IOStatistics(); + stats.setDataSize(proto.getDataSize()); + stats.setItemsProcessed(proto.getItemsProcessed()); + return stats; + } + + public static TaskStatisticsProto convertToProto(TaskStatistics stats) { + TaskStatisticsProto.Builder builder = TaskStatisticsProto.newBuilder(); + for (Map.Entry entry : stats.getIOStatistics().entrySet()) { + builder.putIoStatistics(entry.getKey(), convertToProto(entry.getValue())); + } + return builder.build(); + } + + public static TaskStatistics convertFromProto(TaskStatisticsProto proto) { + TaskStatistics stats = new TaskStatistics(); + for (Map.Entry entry : proto.getIoStatisticsMap().entrySet()) { + stats.addIO(entry.getKey(), convertFromProto(entry.getValue())); + } + return stats; + } + + public static TezCountersProto convertTezCountersToProto(TezCounters counters) { + TezCountersProto.Builder builder = TezCountersProto.newBuilder(); + Iterator groupIterator = counters.iterator(); + while (groupIterator.hasNext()) { + CounterGroup counterGroup = groupIterator.next(); + TezCounterGroupProto.Builder groupBuilder = TezCounterGroupProto.newBuilder(); + groupBuilder.setName(counterGroup.getName()); + if (counterGroup.getDisplayName() != null) { + groupBuilder.setDisplayName(counterGroup.getDisplayName()); + } + Iterator counterIterator = counterGroup.iterator(); + while (counterIterator.hasNext()) { + TezCounter counter = counterIterator.next(); + TezCounterProto.Builder counterBuilder = + TezCounterProto.newBuilder().setName(counter.getName()).setValue(counter.getValue()); + if (counter.getDisplayName() != null) { + counterBuilder.setDisplayName(counter.getDisplayName()); + } + groupBuilder.addCounters(counterBuilder.build()); + } + builder.addCounterGroups(groupBuilder.build()); + } + return builder.build(); + } + + public static TezCounters convertTezCountersFromProto(TezCountersProto proto) { + TezCounters counters = new TezCounters(); + for (TezCounterGroupProto groupProto : proto.getCounterGroupsList()) { + CounterGroup group = counters.addGroup(groupProto.getName(), groupProto.getDisplayName()); + for (TezCounterProto counterProto : groupProto.getCountersList()) { + TezCounter counter = + group.findCounter(counterProto.getName(), counterProto.getDisplayName()); + counter.setValue(counterProto.getValue()); + } + } + return counters; + } + + public static TaskStatusUpdateEventProto convertToProto(TaskStatusUpdateEvent event) { + TaskStatusUpdateEventProto.Builder builder = + TaskStatusUpdateEventProto.newBuilder() + .setProgress(event.getProgress()) + .setProgressNotified(event.getProgressNotified()); + if (event.getCounters() != null) { + builder.setCounters(convertTezCountersToProto(event.getCounters())); + } + if (event.getStatistics() != null) { + builder.setStatistics(convertToProto(event.getStatistics())); + } + return builder.build(); + } + + public static TaskStatusUpdateEvent convertFromProto(TaskStatusUpdateEventProto proto) { + return new TaskStatusUpdateEvent( + proto.hasCounters() ? convertTezCountersFromProto(proto.getCounters()) : null, + proto.getProgress(), + proto.hasStatistics() ? convertFromProto(proto.getStatistics()) : null, + proto.getProgressNotified()); + } + + public static TezEventProto convertToProto(TezEvent event) { + TezEventProto.Builder builder = + TezEventProto.newBuilder() + .setEventType(EventTypeProto.valueOf(event.getEventType().name())) + .setEventReceivedTime(event.getEventReceivedTime()); + if (event.getSourceInfo() != null) { + builder.setSourceInfo(convertToProto(event.getSourceInfo())); + } + if (event.getDestinationInfo() != null) { + builder.setDestinationInfo(convertToProto(event.getDestinationInfo())); + } + + // Payload handling + if (event.getEventType() == EventType.TASK_STATUS_UPDATE_EVENT) { + builder.setEventPayload( + convertToProto((TaskStatusUpdateEvent) event.getEvent()).toByteString()); + } else { + try { + builder.setEventPayload(TezTaskUmbilicalProtocolUtils.serializeToByteString(event)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return builder.build(); + } + + public static TezEvent convertFromProto(TezEventProto proto) { + TezEvent event = new TezEvent(); + event.setEventReceivedTime(proto.getEventReceivedTime()); + event.setEventType(EventType.valueOf(proto.getEventType().name())); + if (proto.hasSourceInfo()) { + event.setSourceInfo(convertFromProto(proto.getSourceInfo())); + } + if (proto.hasDestinationInfo()) { + event.setDestinationInfo(convertFromProto(proto.getDestinationInfo())); + } + + if (!proto.getEventPayload().isEmpty()) { + if (proto.getEventType() == EventTypeProto.TASK_STATUS_UPDATE_EVENT) { + try { + TaskStatusUpdateEventProto payloadProto = + TaskStatusUpdateEventProto.parseFrom(proto.getEventPayload()); + event.setEvent(convertFromProto(payloadProto)); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } else { + try { + TezTaskUmbilicalProtocolUtils.deserializeFromByteString(proto.getEventPayload(), event); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + return event; + } + + // --- Heartbeat Converters --- + + public static HeartbeatRequestProto convertToProto(TezHeartbeatRequest request) { + HeartbeatRequestProto.Builder builder = + HeartbeatRequestProto.newBuilder() + .setRequestId(request.getRequestId()) + .setPreRoutedStartIndex(request.getPreRoutedStartIndex()) + .setContainerIdentifier(request.getContainerIdentifier()) + .setStartIndex(request.getStartIndex()) + .setMaxEvents(request.getMaxEvents()) + .setUsedMemory(request.getUsedMemory()); + if (request.getCurrentTaskAttemptID() != null) { + builder.setCurrentTaskAttemptId(convertToProto(request.getCurrentTaskAttemptID())); + } + if (request.getEvents() != null) { + for (TezEvent event : request.getEvents()) { + builder.addEvents(convertToProto(event)); + } + } + return builder.build(); + } + + public static TezHeartbeatRequest convertFromProto(HeartbeatRequestProto proto) { + List events = new ArrayList<>(proto.getEventsCount()); + for (TezEventProto ep : proto.getEventsList()) { + events.add(convertFromProto(ep)); + } + return new TezHeartbeatRequest( + proto.getRequestId(), + events, + proto.getPreRoutedStartIndex(), + proto.getContainerIdentifier(), + proto.hasCurrentTaskAttemptId() ? convertFromProto(proto.getCurrentTaskAttemptId()) : null, + proto.getStartIndex(), + proto.getMaxEvents(), + proto.getUsedMemory()); + } + + public static HeartbeatResponseProto convertToProto(TezHeartbeatResponse response) { + HeartbeatResponseProto.Builder builder = + HeartbeatResponseProto.newBuilder() + .setLastRequestId(response.getLastRequestId()) + .setShouldDie(response.shouldDie()) + .setNextFromEventId(response.getNextFromEventId()) + .setNextPreRoutedEventId(response.getNextPreRoutedEventId()); + if (response.getEvents() != null) { + for (TezEvent event : response.getEvents()) { + builder.addEvents(convertToProto(event)); + } + } + return builder.build(); + } + + public static TezHeartbeatResponse convertFromProto(HeartbeatResponseProto proto) { + List events = new ArrayList<>(proto.getEventsCount()); + for (TezEventProto ep : proto.getEventsList()) { + events.add(convertFromProto(ep)); + } + TezHeartbeatResponse response = new TezHeartbeatResponse(events); + response.setLastRequestId(proto.getLastRequestId()); + if (proto.getShouldDie()) { + response.setShouldDie(); + } + response.setNextFromEventId(proto.getNextFromEventId()); + response.setNextPreRoutedEventId(proto.getNextPreRoutedEventId()); + return response; + } +} diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/package-info.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/package-info.java new file mode 100644 index 0000000000..f452f32c77 --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +@Private +package org.apache.tez.common; + +import org.apache.hadoop.classification.InterfaceAudience.Private; diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java index e9d4f1127e..93b94f897c 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java @@ -119,6 +119,14 @@ public Event getEvent() { return event; } + public void setEvent(Event event) { + this.event = event; + } + + public void setEventType(EventType eventType) { + this.eventType = eventType; + } + public void setEventReceivedTime(long eventReceivedTime) { // TODO save this.eventReceivedTime = eventReceivedTime; } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/protocolPB/TezTaskUmbilicalProtocolBlockingPB.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/protocolPB/TezTaskUmbilicalProtocolBlockingPB.java new file mode 100644 index 0000000000..ebf641603f --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/protocolPB/TezTaskUmbilicalProtocolBlockingPB.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tez.runtime.internals.protocolPB; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.security.token.TokenInfo; +import org.apache.tez.runtime.common.security.JobTokenSelector; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.TezTaskUmbilicalProtocol; + +@Private +@TokenInfo(JobTokenSelector.class) +@ProtocolInfo( + protocolName = "org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolBlockingPB", + protocolVersion = 1) +public interface TezTaskUmbilicalProtocolBlockingPB + extends TezTaskUmbilicalProtocol.BlockingInterface, VersionedProtocol {} diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/protocolPB/TezTaskUmbilicalProtocolPBClientImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/protocolPB/TezTaskUmbilicalProtocolPBClientImpl.java new file mode 100644 index 0000000000..109870ed20 --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/protocolPB/TezTaskUmbilicalProtocolPBClientImpl.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tez.runtime.internals.protocolPB; + +import static org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolUtils.service; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.tez.common.ContainerContext; +import org.apache.tez.common.ContainerTask; +import org.apache.tez.common.TezPBConverters; +import org.apache.tez.common.TezTaskUmbilicalProtocol; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.CanCommitRequestProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.ContainerContextProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.GetTaskRequestProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.GetTaskResponseProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.HeartbeatRequestProto; + +/** + * Protobuf-based client implementation for the TezTaskUmbilicalProtocol. This class handles the + * translation between Java API calls and remote RPC calls. + */ +@Private +public class TezTaskUmbilicalProtocolPBClientImpl implements TezTaskUmbilicalProtocol, Closeable { + + private final TezTaskUmbilicalProtocolBlockingPB proxy; + + public TezTaskUmbilicalProtocolPBClientImpl(TezTaskUmbilicalProtocolBlockingPB proxy) { + this.proxy = proxy; + } + + @Override + public void close() throws IOException { + if (proxy != null) { + RPC.stopProxy(proxy); + } + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + return service(() -> RPC.getProtocolVersion(TezTaskUmbilicalProtocolBlockingPB.class)); + } + + @Override + public ProtocolSignature getProtocolSignature( + String protocol, long clientVersion, int clientMethodsHash) throws IOException { + return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash); + } + + /** Called by the Task process to retrieve the work from the AM. */ + @Override + public ContainerTask getTask(ContainerContext containerContext) throws IOException { + return service( + () -> { + GetTaskRequestProto.Builder builder = GetTaskRequestProto.newBuilder(); + if (containerContext != null) { + builder.setContainerContext( + ContainerContextProto.newBuilder() + .setContainerIdentifier(containerContext.getContainerIdentifier()) + .build()); + } + GetTaskResponseProto response = proxy.getTask(null, builder.build()); + return response.hasContainerTask() + ? TezPBConverters.convertFromProto(response.getContainerTask()) + : null; + }); + } + + /** Called by the Task process to check if it's allowed to commit its outputs. */ + @Override + public boolean canCommit(TezTaskAttemptID taskid) throws IOException { + return service( + () -> { + CanCommitRequestProto.Builder builder = CanCommitRequestProto.newBuilder(); + if (taskid != null) { + builder.setTaskAttemptId(TezPBConverters.convertToProto(taskid)); + } + return proxy.canCommit(null, builder.build()).getResponse(); + }); + } + + /** Heartbeat sent by the Task to report status and retrieve events from the AM. */ + @Override + public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) + throws IOException, TezException { + return service( + () -> { + HeartbeatRequestProto requestProto = TezPBConverters.convertToProto(request); + return TezPBConverters.convertFromProto(proxy.heartbeat(null, requestProto)); + }); + } +} diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/protocolPB/TezTaskUmbilicalProtocolPBServerImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/protocolPB/TezTaskUmbilicalProtocolPBServerImpl.java new file mode 100644 index 0000000000..1f4830fe45 --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/protocolPB/TezTaskUmbilicalProtocolPBServerImpl.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tez.runtime.internals.protocolPB; + +import static org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolUtils.translate; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.tez.common.ContainerContext; +import org.apache.tez.common.ContainerTask; +import org.apache.tez.common.TezPBConverters; +import org.apache.tez.common.TezTaskUmbilicalProtocol; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.CanCommitRequestProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.CanCommitResponseProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.GetTaskRequestProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.GetTaskResponseProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.HeartbeatRequestProto; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolProtos.HeartbeatResponseProto; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * Protobuf-based server implementation for the TezTaskUmbilicalProtocol. This class translates + * incoming RPC calls into internal Java protocol method calls. + */ +@Private +public class TezTaskUmbilicalProtocolPBServerImpl implements TezTaskUmbilicalProtocolBlockingPB { + + private final TezTaskUmbilicalProtocol real; + + public TezTaskUmbilicalProtocolPBServerImpl(TezTaskUmbilicalProtocol real) { + this.real = real; + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + return RPC.getProtocolVersion(TezTaskUmbilicalProtocolBlockingPB.class); + } + + @Override + public ProtocolSignature getProtocolSignature( + String protocol, long clientVersion, int clientMethodsHash) throws IOException { + return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash); + } + + /** Receives a request for work from a Task process. */ + @Override + public GetTaskResponseProto getTask(RpcController controller, GetTaskRequestProto request) + throws ServiceException { + return translate( + () -> { + ContainerContext containerContext = null; + if (request.hasContainerContext()) { + containerContext = + new ContainerContext(request.getContainerContext().getContainerIdentifier()); + } + ContainerTask response = real.getTask(containerContext); + GetTaskResponseProto.Builder builder = GetTaskResponseProto.newBuilder(); + if (response != null) { + builder.setContainerTask(TezPBConverters.convertToProto(response)); + } + return builder.build(); + }); + } + + /** Receives a request to check if a task is allowed to commit its outputs. */ + @Override + public CanCommitResponseProto canCommit(RpcController controller, CanCommitRequestProto request) + throws ServiceException { + return translate( + () -> { + TezTaskAttemptID taskAttemptID = null; + if (request.hasTaskAttemptId()) { + taskAttemptID = TezPBConverters.convertFromProto(request.getTaskAttemptId()); + } + boolean response = real.canCommit(taskAttemptID); + return CanCommitResponseProto.newBuilder().setResponse(response).build(); + }); + } + + /** Receives heartbeats from Task processes. */ + @Override + public HeartbeatResponseProto heartbeat(RpcController controller, HeartbeatRequestProto request) + throws ServiceException { + return translate( + () -> { + TezHeartbeatRequest heartbeatRequest = TezPBConverters.convertFromProto(request); + TezHeartbeatResponse response = real.heartbeat(heartbeatRequest); + return TezPBConverters.convertToProto(response); + }); + } +} diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/protocolPB/TezTaskUmbilicalProtocolUtils.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/protocolPB/TezTaskUmbilicalProtocolUtils.java new file mode 100644 index 0000000000..4ca9944fe3 --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/protocolPB/TezTaskUmbilicalProtocolUtils.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tez.runtime.internals.protocolPB; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.io.Writable; +import org.apache.tez.dag.api.TezException; + +import com.google.protobuf.ByteString; +import com.google.protobuf.ServiceException; + +@Private +public final class TezTaskUmbilicalProtocolUtils { + + private TezTaskUmbilicalProtocolUtils() {} + + /** Serialize a Writable into a protobuf ByteString. */ + public static ByteString serializeToByteString(Writable writable) throws IOException { + if (writable == null) { + return null; + } + ByteString.Output os = ByteString.newOutput(); + try (DataOutputStream dos = new DataOutputStream(os)) { + writable.write(dos); + } + return os.toByteString(); + } + + /** Deserialize specific writable from bytes in a protobuf string. */ + public static void deserializeFromByteString(ByteString byteString, Writable writable) + throws IOException { + if (byteString == null || byteString.isEmpty()) { + return; + } + try (InputStream is = byteString.newInput(); + DataInputStream dis = new DataInputStream(is)) { + writable.readFields(dis); + } + } + + /* + * Service invocation with exception translation. + */ + + @FunctionalInterface + public interface ServiceCallable { + T call() throws ServiceException; + } + + /** Invoke a service. Translates ServiceException to IOException. */ + public static T service(ServiceCallable callable) throws IOException { + try { + return callable.call(); + } catch (ServiceException e) { + throw new IOException(e); + } + } + + @FunctionalInterface + public interface ClientCallable { + T call() throws IOException, TezException; + } + + /** Translates application exceptions (IOException, TezException) to RPC ServiceException. */ + public static T translate(ClientCallable callable) throws ServiceException { + try { + return callable.call(); + } catch (IOException | TezException e) { + throw new ServiceException(e); + } + } +} diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/protocolPB/package-info.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/protocolPB/package-info.java new file mode 100644 index 0000000000..26ad3fe002 --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/protocolPB/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +@Private +package org.apache.tez.runtime.internals.protocolPB; + +import org.apache.hadoop.classification.InterfaceAudience.Private; diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 60faec56fa..08a37e99c5 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,13 +7,14 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.tez.runtime.task; @@ -41,6 +42,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.ipc.CallerContext; +import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; @@ -78,6 +80,8 @@ import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; import org.apache.tez.runtime.hook.TezTaskAttemptHook; import org.apache.tez.runtime.internals.api.TaskReporterInterface; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolBlockingPB; +import org.apache.tez.runtime.internals.protocolPB.TezTaskUmbilicalProtocolPBClientImpl; import org.apache.tez.util.LoggingUtils; import org.apache.tez.util.TezRuntimeShutdownHandler; @@ -196,13 +200,19 @@ public TezChild(Configuration conf, String host, int port, String containerIdent final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, port); SecurityUtil.setTokenService(jobToken, address); taskOwner.addToken(jobToken); - this.umbilical = taskOwner.doAs(new PrivilegedExceptionAction() { - @Override - public TezTaskUmbilicalProtocol run() throws Exception { - return RPC.getProxy(TezTaskUmbilicalProtocol.class, - TezTaskUmbilicalProtocol.versionID, address, defaultConf); - } - }); + this.umbilical = + taskOwner.doAs( + (PrivilegedExceptionAction) + () -> { + RPC.setProtocolEngine( + defaultConf, + TezTaskUmbilicalProtocolBlockingPB.class, + ProtobufRpcEngine2.class); + TezTaskUmbilicalProtocolBlockingPB proxy = + RPC.getProxy( + TezTaskUmbilicalProtocolBlockingPB.class, 0, address, defaultConf); + return new TezTaskUmbilicalProtocolPBClientImpl(proxy); + }); ownUmbilical = true; } else { this.umbilical = umbilical; diff --git a/tez-runtime-internals/src/main/proto/TezTaskUmbilicalProtocol.proto b/tez-runtime-internals/src/main/proto/TezTaskUmbilicalProtocol.proto new file mode 100644 index 0000000000..7c50fcbab4 --- /dev/null +++ b/tez-runtime-internals/src/main/proto/TezTaskUmbilicalProtocol.proto @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +option java_package = "org.apache.tez.runtime.internals.protocolPB"; +option java_outer_classname = "TezTaskUmbilicalProtocolProtos"; +option java_generic_services = true; + +// Imitates TezDAGID.java +message TezDAGIDProto { + int64 cluster_timestamp = 1; + int32 app_id = 2; + int32 id = 3; +} + +// Imitates TezVertexID.java +message TezVertexIDProto { + TezDAGIDProto dag_id = 1; + int32 id = 2; +} + +// Imitates TezTaskID.java +message TezTaskIDProto { + TezVertexIDProto vertex_id = 1; + int32 id = 2; +} + +// Imitates TezTaskAttemptID.java +message TezTaskAttemptIDProto { + TezTaskIDProto task_id = 1; + int32 id = 2; +} + +// Imitates ContainerContext.java +message ContainerContextProto { + string container_identifier = 1; +} + +// Imitates UserPayload.java +message TezUserPayloadProto { + bytes user_payload = 1; + int32 version = 2; +} + +// Imitates EntityDescriptor.java +message TezEntityDescriptorProto { + string class_name = 1; + TezUserPayloadProto user_payload = 2; + bytes history_text = 3; +} + +// Imitates TezLocalResource.java +message TezLocalResourceProto { + string uri = 1; + int64 size = 2; + int64 timestamp = 3; +} + +// Imitates InputSpec.java +message InputSpecProto { + string source_vertex_name = 1; + TezEntityDescriptorProto input_descriptor = 2; + int32 physical_edge_count = 3; +} + +// Imitates OutputSpec.java +message OutputSpecProto { + string destination_vertex_name = 1; + TezEntityDescriptorProto output_descriptor = 2; + int32 physical_edge_count = 3; +} + +// Imitates GroupInputSpec.java +message GroupInputSpecProto { + string group_name = 1; + repeated string group_vertices = 2; + TezEntityDescriptorProto merged_input_descriptor = 3; +} + +// Imitates TaskSpec.java +message TaskSpecProto { + TezTaskAttemptIDProto task_attempt_id = 1; + string dag_name = 2; + string vertex_name = 3; + int32 vertex_parallelism = 4; + TezEntityDescriptorProto processor_descriptor = 5; + repeated InputSpecProto input_specs = 6; + repeated OutputSpecProto output_specs = 7; + repeated GroupInputSpecProto group_input_specs = 8; + bytes task_conf_bytes = 9; +} + +// Imitates ContainerTask.java +message ContainerTaskProto { + TaskSpecProto task_spec = 1; + bool should_die = 2; + map additional_resources = 3; + bytes credentials_binary = 4; + bool credentials_changed = 5; +} + +// Imitates EventMetaData.EventProducerConsumerType enum +enum EventProducerConsumerTypeProto { + EPC_INPUT = 0; + EPC_PROCESSOR = 1; + EPC_OUTPUT = 2; + EPC_SYSTEM = 3; +} + +// Imitates EventMetaData.java +message EventMetaDataProto { + EventProducerConsumerTypeProto producer_consumer_type = 1; + string task_vertex_name = 2; + string edge_vertex_name = 3; + TezTaskAttemptIDProto task_attempt_id = 4; +} + +// Imitates EventType.java enum +enum EventTypeProto { + TASK_ATTEMPT_COMPLETED_EVENT = 0; + TASK_ATTEMPT_FAILED_EVENT = 1; + TASK_ATTEMPT_KILLED_EVENT = 2; + DATA_MOVEMENT_EVENT = 3; + INPUT_READ_ERROR_EVENT = 4; + INPUT_FAILED_EVENT = 5; + TASK_STATUS_UPDATE_EVENT = 6; + VERTEX_MANAGER_EVENT = 7; + ROOT_INPUT_DATA_INFORMATION_EVENT = 8; + COMPOSITE_DATA_MOVEMENT_EVENT = 9; + ROOT_INPUT_INITIALIZER_EVENT = 10; + COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT = 11; + CUSTOM_PROCESSOR_EVENT = 12; +} + +// Imitates IOStatistics.java +message IOStatisticsProto { + int64 data_size = 1; + int64 items_processed = 2; +} + +// Imitates TaskStatistics.java +message TaskStatisticsProto { + map io_statistics = 1; +} + +// Imitates TezCounter.java +message TezCounterProto { + string name = 1; + string display_name = 2; + int64 value = 3; +} + +// Imitates CounterGroup.java +message TezCounterGroupProto { + string name = 1; + string display_name = 2; + repeated TezCounterProto counters = 3; +} + +// Imitates TezCounters.java +message TezCountersProto { + repeated TezCounterGroupProto counter_groups = 1; +} + +// Imitates TaskStatusUpdateEvent.java +message TaskStatusUpdateEventProto { + TezCountersProto counters = 1; + float progress = 2; + TaskStatisticsProto statistics = 3; + bool progress_notified = 4; +} + +// Imitates TezEvent.java +message TezEventProto { + EventTypeProto event_type = 1; + bytes event_payload = 2; + EventMetaDataProto source_info = 3; + EventMetaDataProto destination_info = 4; + int64 event_received_time = 5; +} + +message GetTaskRequestProto { + ContainerContextProto container_context = 1; +} + +message GetTaskResponseProto { + ContainerTaskProto container_task = 1; +} + +message CanCommitRequestProto { + TezTaskAttemptIDProto task_attempt_id = 1; +} + +message CanCommitResponseProto { + bool response = 1; +} + +// Imitates TezHeartbeatRequest.java +message HeartbeatRequestProto { + int64 request_id = 1; + repeated TezEventProto events = 2; + int32 pre_routed_start_index = 3; + string container_identifier = 4; + TezTaskAttemptIDProto current_task_attempt_id = 5; + int32 start_index = 6; + int32 max_events = 7; + int64 used_memory = 8; +} + +// Imitates TezHeartbeatResponse.java +message HeartbeatResponseProto { + int64 last_request_id = 1; + bool should_die = 2; + repeated TezEventProto events = 3; + int32 next_from_event_id = 4; + int32 next_pre_routed_event_id = 5; +} + +// --- RPC Service --- + +service TezTaskUmbilicalProtocol { + rpc getTask (GetTaskRequestProto) returns (GetTaskResponseProto); + rpc canCommit (CanCommitRequestProto) returns (CanCommitResponseProto); + rpc heartbeat (HeartbeatRequestProto) returns (HeartbeatResponseProto); +}